From user-return-14521-archive-asf-public=cust-asf.ponee.io@storm.apache.org Thu Aug 1 09:17:13 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id AC75C180644 for ; Thu, 1 Aug 2019 11:17:12 +0200 (CEST) Received: (qmail 66492 invoked by uid 500); 1 Aug 2019 09:17:10 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 66481 invoked by uid 99); 1 Aug 2019 09:17:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Aug 2019 09:17:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B77A81A016F for ; Thu, 1 Aug 2019 09:17:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.8 X-Spam-Level: * X-Spam-Status: No, score=1.8 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id qhBM48krbZ7k for ; Thu, 1 Aug 2019 09:17:05 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::232; helo=mail-oi1-x232.google.com; envelope-from=generalbas.srd@gmail.com; receiver= Received: from mail-oi1-x232.google.com (mail-oi1-x232.google.com [IPv6:2607:f8b0:4864:20::232]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 24DDB7D3FC for ; Thu, 1 Aug 2019 09:17:05 +0000 (UTC) Received: by mail-oi1-x232.google.com with SMTP id e189so53202429oib.11 for ; Thu, 01 Aug 2019 02:17:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=UZSdgbYhfxuVUXC8bJCOqb0lpSs+oscAR7qIjqhJF+s=; b=KSqAEldJvDrZuhcwRJt+ndYrjZ2J9ZgXLQGUmpVL7d6SZCLAUD6xyaH9qRjsb5WxAW /Q07yxohAbY3sohU/UXiRhIkN9LJyduJsowCiiWyniNxjU2e2EIJkqVwveaOU9xKalW1 GQrZnYMYeGVld+N2FzyzfJGuimTH79JGZ+22g7pBE58I1uN6emQyWhCmoJFkFk4Pj5CR mxmpxh1xNTkQfj1DHG/6iUNW4dFVr3ClzwQKtmBGPrZR+4iM7x5v601J+YyzM7kyVsmB caRTMOud6A5pfbCs4h1w+7U0QP/kpGhmUikBEsQWFVGtZIE5dcdXRDSPsSPywturlhGJ 2fNA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=UZSdgbYhfxuVUXC8bJCOqb0lpSs+oscAR7qIjqhJF+s=; b=Koz1HWpdk0nynVRs3Qnw2fRc5HZOypQJZPy07ImRepUzQBJnj29EmVtKxhjnkdJ0pD 1dlMxg+Jt5IckFGbteXp4O7s2m+aOssfCb/mfTBARjJ4J36xDDIFdb6dLKa8j/cUsjmq BuoziDlClJOSORNe06XQmqm71N4EpulXZmnwKV7pyBcl4rNqkhe9l1HEI3r9/LA7Lu2A niOKaywKRB78tiWFr4GwLdwz7sjYO5hOKJ/gwA4Wkt0YoFU1RF5x6O1D0l6pcBDop0ka /JevWJxpHcqSJWyA/s4bTUfwPyfIUoXebbffPCDAXP02plywwg4f7WptwgKHhh6j0K/5 vzdw== X-Gm-Message-State: APjAAAW39CDv69bQc20r2kLUZJR5wHpcVllgl+G0oXaKYZxG2kuCbRvF ERtdNrYQO4oJlLQVMdQLMni158ljpWnKNIVoW5V2qoet X-Google-Smtp-Source: APXvYqxEuN125+bXTI2eYH8GnKGHFmEzkXrQLVzzsNOnWNosxTbZSL1xSGpWX4aMDZHcMh96XPr+6eFJ9TWN0peApdA= X-Received: by 2002:aca:de46:: with SMTP id v67mr59650824oig.167.1564651017414; Thu, 01 Aug 2019 02:16:57 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: =?UTF-8?Q?Stig_Rohde_D=C3=B8ssing?= Date: Thu, 1 Aug 2019 11:16:48 +0200 Message-ID: Subject: Re: Strange time aggregation behavior exhibited by BaseWindowedBolt To: user@storm.apache.org Content-Type: multipart/alternative; boundary="000000000000172ba0058f0ab4d5" --000000000000172ba0058f0ab4d5 Content-Type: text/plain; charset="UTF-8" Regarding why you need the 5th tuple, it is happening because you are using timestamp fields. The windowing code will receive the first 4 tuples and add them to the same window. Until it receives the 5th tuple, there is no way to tell whether the window is "done", as we might receive more tuples that fall within the window. The 5th tuple acts as a trigger that tells the windowing code that the window with the first 4 tuples is now over, and should be delivered to your bolt. More specifically, the way it works is that there's a thread running which periodically (every 10 seconds in your case) sets a watermark. The watermark is set to be the timestamp of the newest received tuple, minus the lag. The watermark is then passed on to a trigger policy, which decides how to generate windows. The windows are generated from the watermark backwards, so if e.g. your watermark is 10, your lag is 2 and your interval is 3, it will try to generate windows for 0-2, 2-5, 5-8. Note that any tuples with timestamp 9 and 10 aren't delivered yet, as you've said you expect up to 2 seconds of lag, so it isn't safe to close the window containing them yet. We can't deliver 9 and 10 until we see a tuple with timestamp 10 plus the lag, so 12. See https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java and https://github.com/apache/storm/blob/925422a5b5ad1c3329a2c2b44db460ae94f70806/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java Regarding why your tuples are getting split, I don't know. Are you maybe running multiple tasks of the windowing bolt? Den tir. 30. jul. 2019 kl. 16.11 skrev Sandeep Singh < tosandeepsingh@gmail.com>: > Sorry for multiple message with same subject as I had to register with > different email address. > To follow up on the thread, can someone explain to me why the tuples with > same timestamp are sometimes sent in two different time windows? And also > why sending an extra 5th tuple is required before storm invokes execute > method? Do I need to set a different value for tumbling window duration or > lag? > Thank you for your help in advance > Sandeep > > On Mon, Jul 29, 2019 at 7:27 PM Sandeep Singh > wrote: > >> During testing of my topology which uses Storm's Tumbling window, I see >> strange behavior how my stream of tuples are handled and split into >> different time windows. >> >> I am using a Tumbling window with duration and lag set to 10 seconds: >> >> * val *duration = BaseWindowedBolt.Duration.*seconds*(10) >> >> >> myBolt.withTumblingWindow(duration).withTimestampField("timestampField").withLag(duration) >> >> >> >> When I send four tuples with timestamp set to same value "now - 1 second" >> (where now = System.*currentTimeMillis*()), I see log messages that >> storm is able to extract the time information from tuples. However bolt's >> *"*execute(inputWindow: TupleWindow)" method never gets invoked. In my >> test I wait for 2 minutes. I do not see any log message about late tuples. >> >> >> >> When I send five tuples, the first four with timestamp "now - 1 second" >> and last one with "now + 1 hour", I see Storm is able to extract all the >> five tuples. However the execute(inputWindow: TupleWindow) method is >> either invoked >> >> a) only once with first four tuple (the behavior I expected) or, >> >> b) twice, first invocation with tuple 1 & 2, second invocation with >> tuple 3 & 4. Since all the four tuples have exactly same timestamp, I don't >> understand why tuples are partitined in different time windows. >> >> Also the bolt's execute method never get's invoked with 5th tuple. >> However, sending 5th tuple (which is well outside the time duration window >> of 10 seconds) ensure that execute method is called once or twice for the >> first four tuples. >> >> >> > --000000000000172ba0058f0ab4d5 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Regarding why you need the 5th tuple, it is happening= because you are using timestamp fields. The windowing code will receive th= e first 4 tuples and add them to the same window. Until it receives the 5th= tuple, there is no way to tell whether the window is "done", as = we might receive more tuples that fall within the window. The 5th tuple act= s as a trigger that tells the windowing code that the window with the first= 4 tuples is now over, and should be delivered to your bolt.

=
More specifically, the way it works is that there's a thread= running which periodically (every 10 seconds in your case) sets a watermar= k. The watermark is set to be the timestamp of the newest received tuple, m= inus the lag. The watermark is then passed on to a trigger policy, which de= cides how to generate windows. The windows are generated from the watermark= backwards, so if e.g. your watermark is 10, your lag is 2 and your interva= l is 3, it will try to generate windows for 0-2, 2-5, 5-8. Note that any tu= ples with timestamp 9 and 10 aren't delivered yet, as you've said y= ou expect up to 2 seconds of lag, so it isn't safe to close the window = containing them yet. We can't deliver 9 and 10 until we see a tuple wit= h timestamp 10 plus the lag, so 12.


Regarding why your tuples are getting split, I don't know. Are yo= u maybe running multiple tasks of the windowing bolt?

Den tir. 30.= jul. 2019 kl. 16.11 skrev Sandeep Singh <tosandeepsingh@gmail.com>:
=
Sorry fo= r multiple message with same subject as I had to register with different em= ail address.
To follow up on the thread, can someone explain to me why = the tuples with same timestamp are sometimes sent in two different time win= dows? And also why sending an extra 5th tuple is required before storm invo= kes execute method? Do I need to set a different value for tumbling window = duration or lag?=C2=A0=C2=A0
Thank you for your help in advance
Sandeep=C2=A0

On Mon, Jul 29, 2019 at 7:27 PM Sandeep Singh= <tosandee= psingh@gmail.com> wrote:

During testing of my topology which uses Storm's Tumbl= ing window, I see strange behavior how my stream of tuples are handled and = split into different time windows.

I am using a Tumbling window with duration a= nd lag set to 10 seconds:

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0val=C2=A0duration =3D BaseWindowedBolt.Duration.seconds(10)

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0myBolt.withTumblingWindow(duration).withTimestampField("timestampFi= eld").withLag(duration)

=C2=A0

When I send four tuples with timestamp set to same = value "now - 1 second" (where now =3D System.currentTimeMillis= ()), I see log messages that storm is able to extract the time informat= ion from tuples. However bolt's=C2=A0"execute(inputWindow: = TupleWindow)" method never gets invoked. In my test I wait for 2 minut= es. I do not see any log message about late tuples.=C2=A0

=C2=A0

When I send five tuple= s,=C2=A0=C2=A0the first four with timestamp=C2=A0=C2=A0"now - 1 second= " and last one with "now + 1 hour", I see Storm is able to e= xtract all the five tuples.=C2=A0=C2=A0However the execute(inputWindow: Tup= leWindow) method is either invoked=C2=A0

=C2=A0=C2=A0a) only once with first four= tuple (the behavior I expected)=C2=A0=C2=A0or,

=C2=A0=C2=A0b) twice, first invoc= ation with tuple 1 & 2, second invocation with tuple 3 & 4. Since a= ll the four tuples have exactly same timestamp, I don't understand why = tuples are partitined in different time windows.

Also the bolt's execute meth= od never get's invoked with 5th tuple. However, sending 5th tuple (whic= h is well outside the time duration window of 10 seconds) ensure that execu= te method is called once or twice for the first four tuples.

=C2= =A0

--000000000000172ba0058f0ab4d5--