flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xingcan Cui <xingc...@gmail.com>
Subject Re: Question about the process order in stream aggregate
Date Thu, 13 Apr 2017 01:42:02 GMT
Hi,

@Radu @Stefano, sorry that I misunderstood it before. We considered the
problem from different viewpoints. I agree that (ingestion) timestamp
injection could be a good solution for this problem in some scenarios.
Thanks.

@Fabian, thanks for your explanation. That makes sense.

Best,
Xingcan

On Thu, Apr 13, 2017 at 2:41 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Xingcan,
>
> the 0L timestamp literal is an artifact of how the Calcite query is
> translated by Flink.
> It represents the value of the procTime() function that is logically used
> to sort the data. Calcite expects this attribute in the schema but Flink's
> OVER operator actually processes the data based on the local wallclock time
> of the operator.
>
> So this is an unnecessary overhead at the moment, which hopefully will be
> resolved before the 1.3 release.
>
> Best, Fabian
>
> 2017-04-12 9:45 GMT+02:00 Stefano Bortoli <stefano.bortoli@huawei.com>:
>
> > I'm afraid that to keep order either you have to process it in a serial
> > way (parallelism 1), or provide an element that allows to sort the
> objects
> > when these are processed in parallel (i.e. rowTime). When you distribute
> > the computation, as Fabian explained, you get a round-robin assignment to
> > the different process functions, which may not respect the original input
> > order in the output.
> >
> > ProcessTime means that you don't care much about time as a sorting
> > reference for the computation of the result.
> >
> > What Radu suggested is to inject the timestamp in your dataStream before
> > processing, and then use rowTime semantics. It won't be "real row time"
> > because your function will inject the timestamp of "arrival", but it will
> > produce sorted output as you "order by rowTime". Hope it helps.
> >
> > Best,
> > Stefano
> >
> > -----Original Message-----
> > From: Xingcan Cui [mailto:xingcanc@gmail.com]
> > Sent: Wednesday, April 12, 2017 8:11 AM
> > To: dev@flink.apache.org
> > Subject: Re: Question about the process order in stream aggregate
> >
> > Hi everybody,
> >
> > thank you all for your help.
> >
> > @Fabian I also check the DataStream that translated from the query and
> try
> > to figure out what happens in each step. The results are as follows
> > (correct me please if there's something wrong):
> >
> > Source -> Map (Order to Row3) -> FlatMap (do project and extract
> > timestamp?) -> Partition (partition by product) ->BoundedOverAggregate
> > (aggregate) -> FlatMap (Row5 to Row2) -> Sink
> >
> > @Stefano. It's indeed unable to keep the order unless we set parallelism
> > of the first MapFunc to 1 (which is unpractical) or execute the partition
> > step in advance (seems to be unpractical too).
> >
> > Anyway, the procTime itself is actually a "blurred concept" that full of
> > uncertainty, right? Now I think it's better to use rowTime instead if the
> > application need order preserving.
> >
> > @Radu, the assignTimestampsAndWatermarks method seems to be useless,
> maybe
> > it only affects the rowTime?
> >
> > There's another question. I find the following code in the generated
> > FlatMap function (step 3 project and extract timestamp):
> >
> > ...
> > java.sql.Timestamp result$16;
> > if (false) {
> >     result$16 = null;
> > }
> > else {
> >     result$16 =
> > org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(0L);
> > }
> >
> > if (false) {
> >     out.setField(2, null);
> > }
> > else {
> >     out.setField(2, result$16);
> > }
> > ...
> >
> > Could you please help me explain what's the 0L timestamp mean?
> >
> > Best,
> > Xingcan
> >
> > On Tue, Apr 11, 2017 at 8:40 PM, Radu Tudoran <radu.tudoran@huawei.com>
> > wrote:
> >
> > > Hi Xingcan,
> > >
> > > If you need to guarantee the order also in the case of procTime a
> > > trick that you can do is to set the working time of the env to
> > > processing time and to assign the proctime to the incoming stream. You
> > can do this via .
> > > assignTimestampsAndWatermarks(new ...) And override override def
> > > extractTimestamp(
> > >       element: type...,
> > >       previousElementTimestamp: Long): Long = {
> > >       System.currentTimeMillis()
> > >     }
> > >
> > > Alternatively you can play around with the stream source and control
> > > the time when the events come
> > >
> > > Dr. Radu Tudoran
> > > Senior Research Engineer - Big Data Expert IT R&D Division
> > >
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > German Research Center
> > > Munich Office
> > > Riesstrasse 25, 80992 München
> > >
> > > E-mail: radu.tudoran@huawei.com
> > > Mobile: +49 15209084330
> > > Telephone: +49 891588344173
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > This e-mail and its attachments contain confidential information from
> > > HUAWEI, which is intended only for the person or entity whose address
> is
> > > listed above. Any use of the information contained herein in any way
> > > (including, but not limited to, total or partial disclosure,
> > reproduction,
> > > or dissemination) by persons other than the intended recipient(s) is
> > > prohibited. If you receive this e-mail in error, please notify the
> sender
> > > by phone or email immediately and delete it!
> > >
> > >
> > > -----Original Message-----
> > > From: fhueske@gmail.com [mailto:fhueske@gmail.com]
> > > Sent: Tuesday, April 11, 2017 2:24 PM
> > > To: Stefano Bortoli; dev@flink.apache.org
> > > Subject: AW: Question about the process order in stream aggregate
> > >
> > > Resending to dev@f.a.o
> > >
> > > Hi Xingcan,
> > >
> > > This is expected behavior. In general, is not possible to guarantee
> > > results for processing time.
> > >
> > > Your query is translated as follows:
> > >
> > > CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n)
> > > -fwd-> MapFunc(n) -fwd-> Sink(n)
> > >
> > > The order of records is changed because of the connection between
> source
> > > and first map function. Here, records are distributed round robin to
> > > increase the parallelism from 1 to n. The parallel instances of map
> might
> > > forward the records in different order to the ProcessFunction that
> > computes
> > > the aggregation.
> > >
> > > Hope this helps,
> > > Fabian
> > >
> > >
> > > Von: Stefano Bortoli
> > > Gesendet: Dienstag, 11. April 2017 14:10
> > > An: dev@flink.apache.org
> > > Betreff: RE: Question about the process order in stream aggregate
> > >
> > > Hi Xingcan,
> > >
> > > Are you using parallelism 1 for the test?  procTime semantics deals
> with
> > > the objects as they loaded in the operators. It could be the
> co-occuring
> > > partitioned events (in the same MS time frame) are processed in
> parallel
> > > and then the output is produced in different order.
> > >
> > > I suggest you to have a look at the integration test to verify that the
> > > configuration of your experiment is correct.
> > >
> > > Best,
> > > Stefano
> > >
> > > -----Original Message-----
> > > From: Xingcan Cui [mailto:xingcanc@gmail.com]
> > > Sent: Tuesday, April 11, 2017 5:31 AM
> > > To: dev@flink.apache.org
> > > Subject: Question about the process order in stream aggregate
> > >
> > > Hi all,
> > >
> > > I run some tests for stream aggregation on rows. The data stream is
> > simply
> > > registered as
> > >
> > > val orderA: DataStream[Order] = env.fromCollection(Seq(
> > >       Order(1L, "beer", 1),
> > >       Order(2L, "diaper", 2),
> > >       Order(3L, "diaper", 3),
> > >       Order(4L, "rubber", 4)))
> > > tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),
> > >
> > > and the SQL is defined as
> > >
> > > select product, sum(amount) over (partition by product order by
> > procTime()
> > > rows between unbounded preceding and current row from orderA).
> > >
> > > My expected output should be
> > >
> > > 2> Result(beer,1)
> > > 2> Result(diaper,2)
> > > 1> Result(rubber,4)
> > > 2> Result(diaper,5).
> > >
> > > However, sometimes I get the following output
> > >
> > > 2> Result(beer,1)
> > > 2> Result(diaper,3)
> > > 1> Result(rubber,4)
> > > 2> Result(diaper,5).
> > >
> > > It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper",
> > 3)"
> > > are out of order. Is that normal?
> > >
> > > BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the
> > > order for them can always be preserved.
> > >
> > > Thanks,
> > > Xingcan
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message