flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Radu Tudoran <radu.tudo...@huawei.com>
Subject RE: Question about the process order in stream aggregate
Date Tue, 11 Apr 2017 12:40:15 GMT
Hi Xingcan,

If you need to guarantee the order also in the case of procTime a trick that you can do is
to set the working time of the env to processing time and to assign the proctime to the incoming
stream. You can do this via .assignTimestampsAndWatermarks(new ...)
And override 
override def extractTimestamp(
      element: type...,
      previousElementTimestamp: Long): Long = {

Alternatively you can play around with the stream source and control the time when the events

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division

German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudoran@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang 
This e-mail and its attachments contain confidential information from HUAWEI, which is intended
only for the person or entity whose address is listed above. Any use of the information contained
herein in any way (including, but not limited to, total or partial disclosure, reproduction,
or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive
this e-mail in error, please notify the sender by phone or email immediately and delete it!

-----Original Message-----
From: fhueske@gmail.com [mailto:fhueske@gmail.com] 
Sent: Tuesday, April 11, 2017 2:24 PM
To: Stefano Bortoli; dev@flink.apache.org
Subject: AW: Question about the process order in stream aggregate

Resending to dev@f.a.o

Hi Xingcan,

This is expected behavior. In general, is not possible to guarantee results for processing

Your query is translated as follows:

CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n) -fwd-> MapFunc(n)
-fwd-> Sink(n)

The order of records is changed because of the connection between source and first map function.
Here, records are distributed round robin to increase the parallelism from 1 to n. The parallel
instances of map might forward the records in different order to the ProcessFunction that
computes the aggregation. 

Hope this helps,

Von: Stefano Bortoli
Gesendet: Dienstag, 11. April 2017 14:10
An: dev@flink.apache.org
Betreff: RE: Question about the process order in stream aggregate

Hi Xingcan,

Are you using parallelism 1 for the test?  procTime semantics deals with the objects as they
loaded in the operators. It could be the co-occuring partitioned events (in the same MS time
frame) are processed in parallel and then the output is produced in different order.

I suggest you to have a look at the integration test to verify that the configuration of your
experiment is correct.


-----Original Message-----
From: Xingcan Cui [mailto:xingcanc@gmail.com] 
Sent: Tuesday, April 11, 2017 5:31 AM
To: dev@flink.apache.org
Subject: Question about the process order in stream aggregate

Hi all,

I run some tests for stream aggregation on rows. The data stream is simply registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 1),
      Order(2L, "diaper", 2),
      Order(3L, "diaper", 3),
      Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime() rows between unbounded
preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for them can
always be preserved.


View raw message