Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 50719200BF7 for ; Mon, 9 Jan 2017 16:52:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 49E2E160B3E; Mon, 9 Jan 2017 15:52:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2070E160B3B for ; Mon, 9 Jan 2017 16:52:29 +0100 (CET) Received: (qmail 36823 invoked by uid 500); 9 Jan 2017 15:52:29 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 36814 invoked by uid 99); 9 Jan 2017 15:52:29 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jan 2017 15:52:29 +0000 Received: from mail-it0-f50.google.com (mail-it0-f50.google.com [209.85.214.50]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id C30A61A0314 for ; Mon, 9 Jan 2017 15:52:28 +0000 (UTC) Received: by mail-it0-f50.google.com with SMTP id c20so56500114itb.0 for ; Mon, 09 Jan 2017 07:52:28 -0800 (PST) X-Gm-Message-State: AIkVDXIGTsopn2liBYNeRWpUfKWiszbmqWbUHZQCEAMzSpPZWDKhlnBXSYjvTQHAw5FWAiHa0KRmQ4eROtq5uA== X-Received: by 10.36.159.193 with SMTP id c184mr10066883ite.72.1483977148114; Mon, 09 Jan 2017 07:52:28 -0800 (PST) MIME-Version: 1.0 References: <4CE0B55D-4A55-4DCA-9DFC-E3039E092B92@data-artisans.com> In-Reply-To: From: Aljoscha Krettek Date: Mon, 09 Jan 2017 15:52:17 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Regarding ordering of events To: user@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c08cd4ac4b1590545ab5ae8 archived-at: Mon, 09 Jan 2017 15:52:31 -0000 --94eb2c08cd4ac4b1590545ab5ae8 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, to clarify what Kostas said. A "single window" in this case is a window for a given key and time period so the window for "key1" in time t1 to t2 can be processed on a different machine from the window for "key2" in time t1 to t2. Cheers, Aljoscha On Thu, 5 Jan 2017 at 21:56 Kostas Kloudas wrote: > Hi Abdul, > > Every window is handled by a single machine, if this is what you mean by > =E2=80=9Cpartition=E2=80=9D. > > Kostas > > On Jan 5, 2017, at 9:21 PM, Abdul Salam Shaikh > wrote: > > Thanks Fabian and Kostas, > > How can I put to use the power of flink as a distributed system ? > > In cases where we have multiple windows, is one single window handled by > one partition entirely or is it spread across several partitions ? > > On Thu, Jan 5, 2017 at 12:25 PM, Fabian Hueske wrote: > > Flink is a distributed system and does not preserve order across > partitions. > The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of > the printing operator. > > You can set the parallelism to 1 to have the stream in order. > > Fabian > > 2017-01-05 12:16 GMT+01:00 Kostas Kloudas : > > Hi Abdul, > > Flink provides no ordering guarantees on the elements within a window. > The only =E2=80=9Corder=E2=80=9D it guarantees is that the results referr= ing to window-1 > are > going to be emitted before those of window-2 (assuming that window-1 > precedes window-2). > > Thanks, > Kostas > > On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh < > abd.salam.shaikh@gmail.com> wrote: > > Hi, > > I am using a JSON file as the source for the streaming (in the ascending > order of the field Umlaufsekunde)which has events as follows: > > {"event":[{"*Umlaufsekunde*":115}]} > {"event":[{"*Umlaufsekunde*":135}]} > {"event":[{"*Umlaufsekunde*":135}]} > {"event":[{"*Umlaufsekunde*":145}]} > {"event":[{"*Umlaufsekunde*":155}]} > {"event":[{"*Umlaufsekunde*":155}]} > {"event":[{"*Umlaufsekunde*":185}]} > {"event":[{"*Umlaufsekunde*":195}]} > {"event":[{"*Umlaufsekunde*":195}]} > {"event":[{"*Umlaufsekunde*":205}]} > {"event":[{"*Umlaufsekunde*":245}]} > > However, when I try to print the stream, it is unordered as given below: > 1> (*115*,null,1483517983252,1190) -- The first value indicating > Umlaufsekunde > 2> (135,null,1483517984877,1190) > 2> (155,null,1483517986861,1190) > 4> (145,null,1483517985752,1190) > 3> (135,null,1483517985424,1190) > 4> (195,null,1483517990736,1190) > 4> (255,null,1483517997424,1190) > 2> (205,null,1483517991518,1190) > 2> (275,null,1483517999330,1190) > 2> (385,null,1483518865371,1190) > 2> (395,null,1483518866840,1190) > 1> (155,null,1483517986533,1190) > 4> (285,null,1483518000189,1190) > 4> (395,null,1483518866231,1190) > > I have also tried using the Timestamps and Watermarks but no luck as > follows: > > public class TimestampExtractor implements > AssignerWithPeriodicWatermarks, Long, > Long>>{ > > private long currentMaxTimestamp; > > @Override > public Watermark getCurrentWatermark() { > return new Watermark(currentMaxTimestamp); > } > > @Override > public long extractTimestamp(Tuple5 element, long > previousElementTimestamp) { > long timestamp =3D element.getField(1); > currentMaxTimestamp =3D timestamp; > return currentMaxTimestamp; > } > > } > > Could anyone suggest how do I handle this problem for the arrival of > events in order ? > > =E2=80=8BThanks!=E2=80=8B > > > > > > > > -- > Thanks & Regards, > > *Abdul Salam Shaikh* > > > --94eb2c08cd4ac4b1590545ab5ae8 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,
to clarify what Kostas said. A "single window= " in this case is a window for a given key and time period so the wind= ow for "key1" in time t1 to t2 can be processed on a different ma= chine from the window for "key2" in time t1 to t2.

=
Cheers,
Aljoscha

On Thu, 5 Jan 2017 at 21:56 Kostas Kloudas <k.kloudas@data-artisans.com>= ; wrote:
Hi Abdul,

Every window is handled by a = single machine, if this is what you mean by =E2=80=9Cpartition=E2=80=9D.

Kostas
=
On Jan 5, 2017, at 9:21 PM, Ab= dul Salam Shaikh <abd.salam.shaikh@gmail.com> wrote:
Thanks Fabian and Kostas,=C2=A0

How can I put to use the power of flink a= s a distributed system ?=C2=A0

In cases where we have multiple windows, is on= e single window handled by one partition entirely or is it spread across se= veral partitions ?=C2=A0
On Thu, Jan 5, 2= 017 at 12:25 PM, Fabian Hueske <fh= ueske@gmail.com> wrote:
Flink is a distributed system and d= oes not preserve order across partitions.
The = number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of = the printing operator.

You can set the parallelism to 1 to have the stre= am in order.

Fabian

2017-01-05 12:16 GMT+01:00 Kostas Kloudas <= span dir=3D"ltr" class=3D"gmail_msg"><k.kloudas@data-artisans.= com>:
Hi Abdul,<= div class=3D"gmail_msg">
Flink provides no ordering guarantees on the elements within a window.<= /div>
The only =E2=80=9Corder=E2=80=9D it guarantee= s is that the results referring to window-1 are
going to be emitted before those of window-2 (assuming that window-1 pre= cedes window-2).

Thanks,
Kostas

On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh <abd.salam.shaikh@gmail.com> wrote:

Hi,=C2=A0

I am using a JSON file as th= e source for the streaming (in the ascending order of the field Umlaufsekun= de)which has events as follows:=C2=A0

{"event":[{"Umlaufsekunde= ":115}]}
{"event":[{"= Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":145}]}
{"event":[{"Umlaufsekunde&q= uot;:155}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde"= ;:185}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":= 195}]}
{"event":[{"Umlaufsekunde":205}]}
{&= quot;event":[{"Umlaufsekunde":245= }]}

However, when I try to print the stream, it is unordered as = given below:=C2=A0
1> (115,null,1483517983252,1190)= =C2=A0-- The first value indicating Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)=
4> (255,null,1483517997424,= 1190)
2> (205,null,148351799= 1518,1190)
2> (275,null,1483= 517999330,1190)
2> (385,null= ,1483518865371,1190)
2> (395= ,null,1483518866840,1190)
1>= (155,null,1483517986533,1190)
= 4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried = using the Timestamps and Watermarks but no luck as follows:=C2=A0

public class TimestampE= xtractor implements AssignerWithPeriodicWatermarks<Tuple5<String, Lon= g, List<Lane>, Long, Long>>{
=C2=A0 =C2=A0=C2=A0
=C2=A0 =C2=A0 private long currentMaxTimestamp;

=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 public Watermark getCurrentWaterm= ark() {
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 return new Watermark(currentMaxTimestamp); =C2=A0=C2=A0
=C2=A0 =C2=A0 }

=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 public long extractTimestamp(Tupl= e5<String, Long> element, long previousElementTimestamp) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 long tim= estamp =3D element.getField(1);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 currentMaxTimestamp =3D timestamp;=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 re= turn currentMaxTimestamp;
= =C2=A0 }
=C2=A0 =C2=A0=C2= =A0
}

Could anyone suggest how do I handle this probl= em for the arrival of events in order ?=C2=A0

= =E2=80=8BThanks!=E2=80=8B







--=
Thanks & Regards,

Abdul Salam Shaikh<= /div>
=

--94eb2c08cd4ac4b1590545ab5ae8--