flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Theo Diefenthal <theo.diefent...@scoop-software.de>
Subject Re: [QUESTION] How to parallelize with explicit punctuation in Flink?
Date Thu, 10 Oct 2019 11:54:45 GMT
Hi Filip, 

My point was not about the computation of the "maximum". My point was: You could hopefully
read the stream sequentially and just assign punctuated watermarks to it. Once you have assigned
the watermarks properly (And before you do your expensive computatation, like in this case
parsing the entire event and building the sum), you could tell flink to repartition / key
the data and shuffle it to the worker tasks in the network, so that the downstream operations
are performed in parallel. Flink will afaik then take care of dealing with the watmark internally
and everything is fine. 
I think it is a rare usecase that you have a sequential stream which can not be simply read
sequentally. If its such a large stream, that you can't do on a single host: "Read, extract
special event, shuffle to the network to other tasks", you probably have a larger issue and
need to rethink on the source level already, e.g. change the method serialization to something
which has a really lightweight parsing for finding the special events or such. 

Best regards 

Von: "Filip Niksic" <fniksic@seas.upenn.edu> 
An: "Theo Diefenthal" <theo.diefenthal@scoop-software.de> 
CC: "user" <user@flink.apache.org> 
Gesendet: Donnerstag, 10. Oktober 2019 00:08:38 
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink? 

Hi Theo, 

It is a single sequential stream. 

If I read your response correctly, you are arguing that summing a bunch of numbers is not
much more computationally intensive than assigning timestamps to those numbers, so if the
latter has to be done sequentially anyway, then why should the former be done in parallel?
To that I can only say that the example I gave is intentionally simple in order to make the
problem conceptually clean. By understanding the conceptually clean version of the problem,
we also gain insight into messier realistic versions where the operations we want to parallelize
may be much more computationally intensive. 


On Wed, Oct 9, 2019 at 1:28 PM [ mailto:theo.diefenthal@scoop-software.de | theo.diefenthal@scoop-software.de
] < [ mailto:theo.diefenthal@scoop-software.de | theo.diefenthal@scoop-software.de ] >

Hi Filip, I don't really understand your problem here. 
Do you have a source with a single sequential stream, where from time to time, there is a
barrier element? Or do you have a source like Kafka with multiple partitions? 
If you have case 2 with multiple partitions, what exactly do you mean by "order matters"?
Will each partition have its own barrier? Or do you have just one barrier for all partitions?
In that case, you will naturally have an ordering problem if your events itself contain no
time data. 
If you have a "sequential source" why do you need parallelism? Won't it work out to read that
partition data in one task (possibly skipping deserialization as much as possible to only
recognize barrier events) and then add a downstream task with higher parallelism doing the
full deserialization and other work? 
Best regardsTheo 
-------- Ursprüngliche Nachricht -------- 
Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in Flink? 
Von: Yun Gao 
An: Filip Niksic ,user 
Cc: Chesnay Schepler 

Hi Filip, 

As a whole, I also think to increase the parallelism of the reduce to more than 1, we should
use a parallel window to compute the partial sum and then sum the partial sum with WindowAll.

For the assignTimestampAndWatermarks, From my side I think the current usage should be OK
and it works the same to the other operators. Besides, for the keyBy Partitioner, I think
"% PARALLELISM" is not necessary and Flink will take care of the parallelism. In other words,
I think you can use .keyBy(x -> x.getId()) directly. 


From:Filip Niksic < [ mailto:fniksic@seas.upenn.edu | fniksic@seas.upenn.edu ] > 
Send Time:2019 Oct. 9 (Wed.) 12:21 
To:user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
Cc:Yun Gao < [ mailto:yungao.gy@aliyun.com | yungao.gy@aliyun.com ] >; Chesnay Schepler
< [ mailto:chesnay@apache.org | chesnay@apache.org ] > 
Subject:Re: [QUESTION] How to parallelize with explicit punctuation in Flink? 

Here is the solution I currently have. It turned out to be more complicated than I expected.
It would be great if a more experienced Flink user could comment and point out the shortcomings.
And if you have other ideas for achieving the same thing, let me know! 

Let's start like in the original email, except now we set the time characteristic to EventTime
and parallelism to a constant named PARALLELISM. 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

final int PARALLELISM = 2; 

DataStream<DataItem> stream = env.fromElements(DataItem.class, 
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier()); 

The first step is to use a punctuation-based timestamp-and-watermark assigner as follows.
We keep track of the number of barriers in the stream. We assign a timestamp n to the n-th
barrier and all the values that immediately precede it, and we emit a watermark with timestamp
n on the n-th barrier. This will allow us to define 1 millisecond tumbling windows that precisely
capture the values between two barriers. 

DataStream<DataItem> timedStream = 
stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<DataItem>()
private long barrierCount = 0; 

public long extractTimestamp(DataItem item, long previousTimestamp) { 
return barrierCount; 

public Watermark checkAndGetNextWatermark(DataItem item, long extractedTimestamp) { 
if (item instanceof Barrier) { 
return new Watermark(extractedTimestamp); 
return null; 

In the test input stream, the first value and barrier get a timestamp 0, and the next two
values and the final barrier get a timestamp 1. Two watermarks with timestamps 0 and 1 are

To achieve parallelization, we partition the values by artificially generated keys. A value's
key is based on its position in the stream, so we first wrap the values into a type that contains
this information. 

class ValueWithId { 
private final int val; 
private final long id; 

public ValueWithId(int val, long id) { 
this.val = val; 
[ http://this.id/ | this.id ] = id; 
public int getVal() { return val; } 
public long getId() { return id; } 

Here is the mapping. At the same time we can drop the barriers, since we no longer need them.
Note that we need to explicitly set the mapping operator's parallelism to 1, since the operator
is stateful. 

DataStream<ValueWithId> wrappedStream = 
timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() { 
private long count = 0L; 

public void flatMap(DataItem item, Collector<ValueWithId> collector) throws Exception
if (item instanceof Value) { 
int val = ((Value) item).getVal(); 
collector.collect(new ValueWithId(val, count++)); 

Now we're ready to do the key-based partitioning. A value's key is its id as assigned above
modulo PARALLELISM. We follow the partitioning by splitting the stream into 1 millisecond
tumbling windows. Then we simply aggregate the partial sums, first for each key separately
(and importantly, in parallel), and then for each window. 

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % PARALLELISM)

.timeWindow(Time.of(1L, TimeUnit.MILLISECONDS)) 
.aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() { 
public Integer createAccumulator() { return 0; } 

public Integer add(ValueWithId valueWithId, Integer acc) { return acc + valueWithId.getVal();

public Integer getResult(Integer acc) { return acc; } 

public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; } 
.timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS)) 
.reduce((x, y) -> x + y); 

Finally, in the original problem I asked for cumulative sums since the start of the stream,
so we perform the last set of transformations to achieve that. 

DataStream<Integer> cumulativeSums = partialSums 
.reduce((x, y) -> x + y); 
// We should see 1 followed by 3 as output 

I am not completely sure if my usage of state in the timestamp-and-watermark assigner and
the mapper is correct. Is it possible for Flink to duplicate the assigner, move it around
and somehow mess up the timestamps? Likewise, is it possible for things to go wrong with the

Another concern I have is that my key-based partitions depend on the constant PARALLELISM.
Ideally, the program should be flexible about the parallelism that happens to be available
during runtime. 

Finally, if anyone notices that I am in any part reinventing the wheel and that Flink already
has a feature implementing some of the above, or that something can be done more elegantly,
let me know! 

Best regards, 


On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic < [ mailto:fniksic@seas.upenn.edu | fniksic@seas.upenn.edu
] > wrote: 

Hi Chesnay, 

Thanks for the reply. While your solution ultimately does use multiple partitions, from what
I can tell the underlying processing is still sequential. Imagine a stream where barriers
are quite rare, say a million values is followed by a barrier. Then these million values all
end up at the same partition and are added up sequentially, and while they are being processed,
the other partitions are waiting for their turn. A truly parallel solution would partition
the million values, process each partition in parallel to get the partial sums, and on each
barrier aggregate the partial sums into a total sum. 


On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler < [ mailto:chesnay@apache.org | chesnay@apache.org
] > wrote: 
In other words, you need a way to partition the stream such that a series of items followed
by a barrier are never interrupted. 

I'm wondering whether you could just apply DataStream#partitionCustom to your source: 
public static class BarrierPartitioner implements Partitioner<DataItem> { 

private int currentPartition = 0; 
public int partition(DataItem key, int numPartitions) { 
if (key instanceof Barrier) { 
int partitionToReturn = currentPartition; 
currentPartition = (currentPartition + 1) % numPartitions; 
return partitionToReturn; 
} else { 
return currentPartition; 

DataStream<DataItem> stream = ...; 
DataStream<DataItem> partitionedStream = stream.partitionCustom(new BarrierPartitioner(),
item -> item); 

On 08/10/2019 14:55, Filip Niksic wrote: 
Hi Yun, 

The behavior with increased parallelism should be the same as with no parallelism. In other
words, for the input from the previous email, the output should always be 1, 3, regardless
of parallelism. Operationally, the partial sums maintained in each subtask should somehow
be aggregated before they are output. 

To answer the second question, I know that watermarks provide the same functionality. Is there
some way to convert the input with explicit punctuation into one with watermarks? I see there
is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm
not sure how this assigner would be used. For example, it could maintain the number of previously
seen Barriers and assign this number as a watermark to each Value, but then this number becomes
the state that needs to be shared between multiple substreams. Or perhaps the Barriers can
somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based
windows that would be triggered by specific user-defined elements in the stream? In such mechanism
perhaps the watermarks would be used internally, but they would not be explicitly exposed
to the user? 

Best regards, 


On Tue, Oct 8, 2019 at 2:19 AM Yun Gao < [ mailto:yungao.gy@aliyun.com | yungao.gy@aliyun.com
] > wrote: 

Hi Filip, 
I have one question on the problem: what is the expected behavior when the parallelism of
the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial
sum of all values received, and whenever the barrier is received, then it just outputs the
partial sum of the received value ? 

Another question is that I think in Flink the watermark mechanism has provided the functionality
similar to punctuation, therefore is it possible to implement the same logic with the Flink
Window directly? 

From:Filip Niksic < [ mailto:fniksic@seas.upenn.edu | fniksic@seas.upenn.edu ] > 
Send Time:2019 Oct. 8 (Tue.) 08:56 
To:user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
Subject:[QUESTION] How to parallelize with explicit punctuation in Flink? 

Hi all, 
What would be a natural way to implement a parallel version of the following Flink program?

Suppose I have a stream of items of type DataItem with two concrete implementations of DataItem:
Value and Barrier. Let’s say that Value holds an integer value, and Barrier acts as explicit
public interface DataItem {} 
public class Value implements DataItem { 
private final int val; 
public Value(int val) { this.val = val; } 
public int getVal() { return val; } 
public class Barrier implements DataItem {} 
The program should maintain a sum of values seen since the beginning of the stream. On each
Barrier, the program should output the sum seen so far. 
An obvious way to implement this would be with a FlatMapFunction, maintaining the sum as state
and emitting it on each Barrier. 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
DataStream<DataItem> stream = env.fromElements(DataItem.class, 
new Value(1), new Barrier(), new Value(3), new Value(-1), new Barrier()); 
stream.flatMap(new FlatMapFunction<DataItem, Integer>() { 
private int sum = 0; 
public void flatMap(DataItem dataItem, Collector<Integer> collector) throws Exception
if (dataItem instanceof Value) { 
sum += ((Value) dataItem).getVal(); 
} else { 
// We should see 1 followed by 3 as output 
However, such an operator cannot be parallelized, since the order of Values and Barriers matters.
That’s why I need to set parallelism to 1 above. Is there a way to rewrite this to exploit
(Another reason to set parallelism to 1 above is that I’m assuming there is a single instance
of the FlatMapFunction. A proper implementation would take more care in using state. Feel
free to comment on that as well.) 

Best regards, 

Filip Niksic 

View raw message