flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <stefano.bort...@huawei.com>
Subject RE: Question about the process order in stream aggregate
Date Tue, 11 Apr 2017 12:10:35 GMT
Hi Xingcan,

Are you using parallelism 1 for the test?  procTime semantics deals with the objects as they
loaded in the operators. It could be the co-occuring partitioned events (in the same MS time
frame) are processed in parallel and then the output is produced in different order.

I suggest you to have a look at the integration test to verify that the configuration of your
experiment is correct.


-----Original Message-----
From: Xingcan Cui [mailto:xingcanc@gmail.com] 
Sent: Tuesday, April 11, 2017 5:31 AM
To: dev@flink.apache.org
Subject: Question about the process order in stream aggregate

Hi all,

I run some tests for stream aggregation on rows. The data stream is simply registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 1),
      Order(2L, "diaper", 2),
      Order(3L, "diaper", 3),
      Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime() rows between unbounded
preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for them can
always be preserved.

View raw message