From dev-return-24669-archive-asf-public=cust-asf.ponee.io@beam.apache.org Wed Aug 26 07:20:32 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mailroute1-lw-us.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 4DEA718057A for ; Wed, 26 Aug 2020 09:20:32 +0200 (CEST) Received: from mail.apache.org (localhost [127.0.0.1]) by mailroute1-lw-us.apache.org (ASF Mail Server at mailroute1-lw-us.apache.org) with SMTP id 2A64E1255F4 for ; Wed, 26 Aug 2020 07:20:30 +0000 (UTC) Received: (qmail 10294 invoked by uid 500); 26 Aug 2020 07:20:27 -0000 Mailing-List: contact dev-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list dev@beam.apache.org Received: (qmail 9928 invoked by uid 99); 26 Aug 2020 07:20:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Aug 2020 07:20:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 719681A428C for ; Wed, 26 Aug 2020 07:20:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.138 X-Spam-Level: X-Spam-Status: No, score=-3.138 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, NICE_REPLY_A=-2.239, RCVD_IN_DNSWL_LOW=-0.7, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=seznam.cz Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ppqhB32eBZQS for ; Wed, 26 Aug 2020 07:20:20 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a02:598:2::90; helo=mxa2.seznam.cz; envelope-from=je.ik@seznam.cz; receiver= Received: from mxa2.seznam.cz (mxa2.seznam.cz [IPv6:2a02:598:2::90]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 2B8E67FB3A for ; Wed, 26 Aug 2020 07:20:20 +0000 (UTC) Received: from email.seznam.cz by email-smtpc13b.ng.seznam.cz (email-smtpc13b.ng.seznam.cz [10.23.14.135]) id 3bffd8f4edd9f90f3be8231e; Wed, 26 Aug 2020 09:20:14 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=seznam.cz; s=beta; t=1598426414; bh=CuxvjdNg/EBCwFvkRWrv6cL4pek3c2xkKANkkId+UC0=; h=Received:Subject:To:References:From:Message-ID:Date:User-Agent: MIME-Version:In-Reply-To:Content-Type:Content-Transfer-Encoding: Content-Language; b=e+rJWxBwlpUMQLjcsMMzPqz9/rMwOhb9U1CBRtnntDCjORO5LfGtPBpzvnSCxRLjm K/7iWru/jJiwbtXCosdEKFGzhEUcDHsS1NbcUa26WnphhCTbQ9UOdiAdirIvQBrz9U lvqdXG9ac1lIrShp4xLxJREj5/blUe/uXZ3fGIuM= Received: from [10.0.1.22] (176-114-240-42.rychlydrat.cz [176.114.240.42]) by email-relay17.ng.seznam.cz (Seznam SMTPD 1.3.119) with ESMTP; Wed, 26 Aug 2020 09:20:08 +0200 (CEST) Subject: Re: [External] Re: Memory Issue When Running Beam On Flink To: dev@beam.apache.org References: <254609ba-f393-6671-8036-83aa8379d258@seznam.cz> <847cd4e8-8485-5116-991d-510fdd977429@seznam.cz> <76854abd-4e21-4e96-899e-8371349afa1a@seznam.cz> <64696491-93fa-9d52-3993-885f27ff07e0@apache.org> From: =?UTF-8?Q?Jan_Lukavsk=c3=bd?= Message-ID: Date: Wed, 26 Aug 2020 09:20:07 +0200 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Thunderbird/68.10.0 MIME-Version: 1.0 In-Reply-To: <64696491-93fa-9d52-3993-885f27ff07e0@apache.org> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-US On 8/25/20 9:27 PM, Maximilian Michels wrote: >> I agree that this probably solves the described issue in the most >> straightforward way, but special handling for global window feels >> weird, as there is really nothing special about global window wrt >> state cleanup. > > Why is special handling for the global window weird? After all, it is > a special case because the global window normally will only be cleaned > up when the application terminates. The inefficiency described happens if and only if the following two conditions are met:  a) there are many timers per single window (as otherwise they will be negligible)  b) there are many keys which actually contain no state (as otherwise the timer would be negligible wrt the state size) It only happens to be the case that global window is the (by far, might be 98% cases) most common case that satisfies these two conditions, but there are other cases as well (e.g. long lasting fixed window). Discussed options 2) and 3) are systematic in the sense that option 2) cancels property a) and option 3) property b). Making use of correlation of global window with these two conditions to solve the issue is of course possible, but a little unsystematic and that's what feels 'weird'. :) > >> It doesn't change anything wrt migration. The timers that were >> already set remain and keep on contributing to the state size. > > That's ok, regular timers for non-global windows need to remain set > and should be persisted. They will be redistributed when scaling up > and down. > >> I'm not sure that's a "problem", rather an inefficiency. But we could >> address it by deleting the timers where they are currently set, as >> mentioned previously. > > I had imagined that we don't even set these timers for the global > window. Thus, there is no need to clean them up. > > -Max > > On 25.08.20 09:43, Jan Lukavský wrote: >> I agree that this probably solves the described issue in the most >> straightforward way, but special handling for global window feels >> weird, as there is really nothing special about global window wrt >> state cleanup. A solution that handles all windows equally would be >> semantically 'cleaner'. If I try to sum up: >> >>   - option 3) seems best, provided that isEmpty() lookup is cheap for >> every state backend (e.g. that we do not hit disk multiple times), >> this option is the best for state size wrt timers in all windows >> >>   - option 2) works well for key-aligned windows, also reduces state >> size in all windows >> >>   - option "watermark timer" - solves issue, easily implemented, but >> doesn't improve situation for non-global windows >> >> My conclusion would be - use watermark timer as hotfix, if we can >> prove that isEmpty() would be cheap, then use option 3) as final >> solution, otherwise use 2). >> >> WDYT? >> >> On 8/25/20 5:48 AM, Thomas Weise wrote: >>> >>> >>> On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels >> > wrote: >>> >>>     I'd suggest a modified option (2) which does not use a timer to >>>     perform >>>     the cleanup (as mentioned, this will cause problems with migrating >>>     state). >>> >>> >>> That's a great idea. It's essentially a mix of 1) and 2) for the >>> global window only. >>> >>> It doesn't change anything wrt migration. The timers that >>> were already set remain and keep on contributing to the state size. >>> >>> I'm not sure that's a "problem", rather an inefficiency. But we >>> could address it by deleting the timers where they are currently >>> set, as mentioned previously. >>> >>> >>>     Instead, whenever we receive a watermark which closes the global >>>     window, >>>     we enumerate all keys and cleanup the associated state. >>> >>>     This is the cleanest and simplest option. >>> >>>     -Max >>> >>>     On 24.08.20 20:47, Thomas Weise wrote: >>>     > >>>     > On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský >>     >>>     > >> wrote: >>>     > >>>     >      > The most general solution would be 3), given it can be >>>     agnostic >>>     >     to window types and does not assume extra runner >>> capabilities. >>>     > >>>     >     Agree, 2) is optimization to that. It might be questionable >>>     if this >>>     >     is premature optimization, but generally querying multiple >>>     states >>>     >     for each clear opeartion to any state might be prohibitive, >>>     mostly >>>     >     when the state would be stored in external database (in >>> case of >>>     >     Flink that would be RocksDB). >>>     > >>>     > For the use case I'm looking at, we are using the heap state >>>     backend. I >>>     > have not checked the RocksDB, but would assume that incremental >>>     cost of >>>     > isEmpty() for other states under the same key is negligible? >>>     > >>>     >      > 3) wouldn't require any state migration. >>>     > >>>     >     Actually, it would, as we would (ideally) like to migrate >>> users' >>>     >     pipelines that already contain timers for the end of global >>>     window, >>>     >     which might not expire ever. >>>     > >>>     > Good catch. This could potentially be addressed by upgrading the >>>     timer >>>     > in the per record path. >>>     > >>>     >     On 8/24/20 7:44 PM, Thomas Weise wrote: >>>     >> >>>     >>     On Fri, Aug 21, 2020 at 12:32 AM Jan Lukavský >>>     >>>     >>     >> wrote: >>>     >> >>>     >>         If there are runners, that are unable to efficiently >>>     enumerate >>>     >>         keys in state, then there probably isn't a runner >>> agnostic >>>     >>         solution to this. If we focus on Flink, we can provide >>>     >>         specific implementation of CleanupTimer, which might >>>     then do >>>     >>         anything from the mentioned options. I'd be +1 for >>>     option 2) >>>     >>         for key-aligned windows (all currently supported) and >>>     option >>>     >>         3) for unaligned windows in the future. >>>     >> >>>     >>     The most general solution would be 3), given it can be >>>     agnostic to >>>     >>     window types and does not assume extra runner >>> capabilities. It >>>     >>     would require to introspect all user states for a given >>> key on >>>     >>     state.clear. That assumes as efficient implementation of >>>     >>     isEmpty(). If all states are empty (have been cleared), >>> then we >>>     >>     can remove the cleanup timer. And add it back on >>> state.add. I'm >>>     >>     planning to give that a shot (for Flink/portable/streaming) >>>     to see >>>     >>     how it performs. >>>     >> >>>     >>         We should also consider how we migrate users from the >>>     current >>>     >>         state to any future implementation. In case of option >>> 2) it >>>     >>         should be possible to do this when the state is >>> loaded from >>>     >>         savepoint, but I'm not 100% sure about that. >>>     >> >>>     >>     3) wouldn't require any state migration. >>>     >> >>>     >>         Jan >>>     >> >>>     >>         On 8/21/20 6:25 AM, Thomas Weise wrote: >>>     >>>         Thanks for the clarification. >>>     >>> >>>     >>>         Here are a few potential options to address the issue, >>>     based >>>     >>>         on the discussion so far: >>>     >>> >>>     >>>         1) Optionally skip cleanup timer for global window >>>     >>>         (user-controlled via pipeline option) >>>     >>> >>>     >>>         2) Instead of setting a cleanup timer for every key, >>>     handle >>>     >>>         all keys for a given window with a single timer. This >>>     would >>>     >>>         be runner specific and depend on if/how a given >>>     >>>         runner supports key enumeration. Flink's keyed state >>>     backend >>>     >>>         supports enumerating keys for a namespace (Beam >>>     window) and >>>     >>>         state tag. [1] >>>     >>> >>>     >>>         3) Set the cleanup timer only when there is actually >>> state >>>     >>>         associated with a key. This could be accomplished by >>>     >>>         intercepting append and clear in BagUserStateHandler >>>     [2] and >>>     >>>         adding/removing the timer appropriately. >>>     >>> >>>     >>>         4) See if TTL support in the runner can is >>> applicable, for >>>     >>>         Flink see [3] >>>     >>> >>>     >>>         [1] >>>     >>> >>> https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L76 >>>     >>> >>>     >>>         [2] >>>     >>> >>> https://github.com/apache/beam/blob/release-2.23.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L315 >>>     >>> >>>     >>>         [3] >>>     >>> >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl >>>     >>> >>>     >>> >>>     >>>         On Thu, Aug 20, 2020 at 8:08 AM Reuven Lax >>>     >>>     >>>         >> >>>     wrote: >>>     >>> >>>     >>>             Also +1 to what Jan said. Streaming pipelines can >>>     process >>>     >>>             bounded PCollections on some paths, so the global >>>     window >>>     >>>             will terminate for those paths. This is also true >>>     for the >>>     >>>             direct runner  tetsts where PCollections pretend >>> to be >>>     >>>             unbounded, but we then advance the watermark >>>     to +inf to >>>     >>>             terminate the pipeline. >>>     >>> >>>     >>>             On Thu, Aug 20, 2020 at 8:06 AM Reuven Lax >>>     >>>              >>>     >> wrote: >>>     >>> >>>     >>>                 It is not Dataflow specific, but I think >>>     Dataflow is >>>     >>>                 the only runner that currently implements >>>     >>> >>>  Drain:https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit >>>     >>> >>>     >>> >>>     >>>                 When a pipeline is drained, all windows >>> (including >>>     >>>                 global windows) end, and the windows processed >>>     (i.e. >>>     >>>                 as if they were fixed windows that terminated). >>>     >>>                 Currently the easiest way to ensure that is to >>>     rely >>>     >>>                 on the end-of-window timers for the global >>> window >>>     >>>                 (alternatives are possible, like issuing a >>>     full-state >>>     >>>                 scan when a pipeline is drained, but that >>> would be >>>     >>>                 quite a bit more complicated). This is not >>>     >>>                 specifically the GC timer, but rather the >>>     >>>                 end-of-window timer that is needed. >>>     >>> >>>     >>>                 I believe that right now we don't have a way of >>>     >>>                 deleting timers if there are no elements >>>     buffered for >>>     >>>                 a key (e.g. a key that received a few elements >>>     that >>>     >>>                 were processed in a trigger and then never >>>     received >>>     >>>                 any more elements). This might be part of the >>>     problem >>>     >>>                 - large numbers of empty keys with noop timers >>>     set. >>>     >>>                 It would be nice if there were a way to detect >>>     this >>>     >>>                 and at least remove the timers for those empty >>>     keys. >>>     >>> >>>     >>>                 Reuven >>>     >>> >>>     >>>                 On Wed, Aug 19, 2020 at 9:20 PM Thomas Weise >>>     >>>                  >>>     >> wrote: >>>     >>> >>>     >>> >>>     >>> >>>     >>>                     On Wed, Aug 19, 2020 at 9:49 AM Reuven Lax >>>     >>>                     >>     >>     >> wrote: >>>     >>> >>>     >>>                         Skipping the cleanup timer for the >>> global >>>     >>>                         window will break any sort of drain >>>     >>>                         functionality, which relies on having >>>     those >>>     >>>                         timers there. It's also necessary for >>>     bounded >>>     >>>                         inputs, for the same reason. >>>     >>> >>>     >>> >>>     >>>                     Can you say a bit more about why this will >>>     break >>>     >>>                     drain functionality and bounded inputs? Is >>>     this >>>     >>>                     Dataflow specific? Is it because the state >>>     would >>>     >>>                     be reused by a subsequent instance of the >>>     pipeline? >>>     >>> >>>     >>>                     For Flink, the GC timers would be >>> triggered by >>>     >>>                     the final watermark and that will be the >>>     end of >>>     >>>                     the streaming job. Launching the same >>> pipeline >>>     >>>                     again will either be a cold start with no >>>     >>>                     previous state or a start from >>>     savepoint/checkpoint. >>>     >>> >>>     >>>                     It sounds like for Dataflow there may be a >>>     need >>>     >>>                     for the user to influence the behavior >>>     while for >>>     >>>                     Flink the GC timers in a global window >>> are not >>>     >>>                     required. >>>     >>> >>>     >>> >>>     >>> >>>     >>> >>>     >>> >>>     >>>                     On Wed, Aug 19, 2020 at 10:31 AM Reuven Lax >>>     >>>                     >>     >>     >> wrote: >>>     >>> >>>     >>> >>>     >>> >>>     >>>                         On Wed, Aug 19, 2020 at 9:53 AM Steve >>>     Niemitz >>>     >>>  >>     >>>     >>>  >>     >> wrote: >>>     >>> >>>     >>>                             for what it's worth, dataflow >>> has the >>>     >>>                             same problem here as well.  >>> We've also >>>     >>>                             worked around it by (optionally) >>>     >>>                             disabling the cleanup timer in >>> global >>>     >>>                             windows.  But I agree, having >>>     drain then >>>     >>>                             be an unsafe operation is not >>> great. >>>     >>> >>>     >>> >>>     >>>                         Dataflow does not require the timers >>>     to be in >>>     >>>                         memory though, so unless the numbers >>>     get very >>>     >>>                         large (to the point where you run out >>>     of disk >>>     >>>                         storage storing the timers), it will >>> not >>>     >>>                         cause your pipelines to fail. >>>     >>> >>>     >>> >>>     >>>                             I think for batch it's less of an >>>     issue >>>     >>>                             since basically everything is in >>> the >>>     >>>                             global window anyways, and batch >>>     >>>                             pipelines run for a fixed amount >>>     of time >>>     >>>                             on a fixed input source.  For >>>     streaming >>>     >>>                             pipelines, it's much easier to run >>>     into >>>     >>>                             this. >>>     >>> >>>     >>> >>>     >>>                             On Wed, Aug 19, 2020 at 12:50 PM >>>     Reuven >>>     >>>                             Lax >>     >>>     >>>  >>     >> wrote: >>>     >>> >>>     >>>  @OnWindowExpiration is a per-key >>>     >>>                                 callback. >>>     >>> >>>     >>>                                 On Wed, Aug 19, 2020 at 9:48 >>>     AM Luke >>>     >>>                                 Cwik >>     >>>     >>>  >>     >> wrote: >>>     >>> >>>     >>>                                     With the addition >>>     >>>  of @OnWindowExpiration, a single >>>     >>>                                     timer across keys >>> optimization >>>     >>>                                     would still make sense. >>>     >>> >>>     >>>                                     On Wed, Aug 19, 2020 at >>>     8:51 AM >>>     >>>                                     Thomas Weise >>>     >>>     >>>  >>     >> wrote: >>>     >>> >>>     >>> https://issues.apache.org/jira/browse/BEAM-10760 >>>     >>> >>>     >>>                                         I confirmed that >>>     skipping the >>>     >>>  cleanup timers resolves the >>>     >>>                                         state leak that we >>>     observe in >>>     >>>                                         the pipeline that >>> uses a >>>     >>>  global window. >>>     >>> >>>     >>>                                         @Luke the GC is key >>>     >>>  partitioned and relies on >>>     >>>  StateInternals. That makes it >>>     >>>  impractical to have a single >>>     >>>                                         timer that performs >>>     cleanup >>>     >>>                                         for multiple keys, at >>>     least >>>     >>>                                         in a runner agnostic >>> way. >>>     >>> >>>     >>>                                         I would like to take a >>>     look >>>     >>>                                         if there is a need >>> to have >>>     >>>                                         the GC timer for a >>>     >>>  global window to start with. >>>     >>>                                         Since the pipeline >>>     >>>  terminates, the >>>     >>>  runner discards all state >>>     >>>  anyways - at least in the >>>     >>>                                         case of Flink. >>>     >>> >>>     >>>  Thomas >>>     >>> >>>     >>>                                         On Mon, Aug 17, 2020 >>>     at 9:46 >>>     >>>                                         AM Luke Cwik >>>     >>>  >>     >>>     >>>  >> wrote: >>>     >>> >>>     >>>  For the cleanup timer. >>>     >>> >>>     >>>  On Mon, Aug 17, >>>     2020 at >>>     >>>  9:45 AM Luke Cwik >>>     >>>   >>>     >>>  >> wrote: >>>     >>> >>>     >>>  Replacing a timer for >>>     >>>  each key with just >>>     >>>  one timer for all >>>     >>>  keys would make sense >>>     >>>  for the global window. >>>     >>> >>>     >>>  On Sun, Aug 16, 2020 >>>     >>>  at 5:54 PM Thomas >>>     >>>  Weise >>>     >>>  >> >>>     >>>  wrote: >>>     >>> >>>     >>>      Thanks Jan. We >>>     >>>      observe a similar >>>     >>>      issue with state >>>     >>>      size growth in >>>     >>>      global window >>>     >>>      (with the >>>     >>>      portable runner). >>>     >>>      We don't see this >>>     >>>      issue >>>     >>>      with non-global >>>     >>>      windows, >>>     >>>      there does not >>>     >>>      appear to be any >>>     >>>      residual. I will >>>     >>>      take a look at >>>     >>>      skipping the >>>     >>>      cleanup timers >>>     >>>      for global >>>     >>>      window and see if >>>     >>>      that resolves the >>>     >>>      issue. These >>>     >>>      timers lead to >>>     >>>      potentially >>>     >>>      unbounded state >>>     >>>      growth and don't >>>     >>>      really serve a >>>     >>>      purpose. >>>     >>> >>>     >>>      Thomas >>>     >>> >>>     >>>      On Sun, Aug 16, >>>     >>>      2020 at 1:16 AM >>>     >>>      Jan Lukavský >>>     >>>       >>>     >>>      >> >>>     >>>      wrote: >>>     >>> >>>     >>>          Hi Catlyn, >>>     >>> >>>     >>>          if you use >>>     >>>          global window >>>     >>>          to perform >>>     >>>          the >>>     >>>          deduplication, then >>>     >>>          it should be >>>     >>>          expected to >>>     >>>          have as many >>>     >>>          timers as >>>     >>>          there are >>>     >>>          unique keys + >>>     >>>          one timer for >>>     >>>          each key that >>>     >>>          arrived >>>     >>>          during the >>>     >>>          last 30 >>>     >>>          minutes >>>     >>>          (because >>>     >>>          there is >>>     >>>          timer set to >>>     >>>          clear the >>>     >>>          state in the >>>     >>>          deduplication >>>     >>>          function). >>>     >>>          The reason >>>     >>>          for that is >>>     >>>          that Beam >>>     >>>          creates timer >>>     >>>          for window >>>     >>>          garbage >>>     >>>          collection >>>     >>>          time to clear >>>     >>>          state (see >>>     >>>          [1]). If it >>>     >>>          is global >>>     >>>          window, then >>>     >>>          each key will >>>     >>>          have >>>     >>>          associated >>>     >>>          timer forever >>>     >>>          (it might >>>     >>>          open question >>>     >>>          if it makes >>>     >>>          sense in this >>>     >>>          case, or if >>>     >>>          Beam can do >>>     >>>          any better). >>>     >>> >>>     >>>          As I wrote >>>     >>>          before, it >>>     >>>          would >>>     >>>          probably help >>>     >>>          to use two >>>     >>>          deduplications in >>>     >>>          two >>>     >>>          successive >>>     >>>          fixed windows >>>     >>>          of length 30 >>>     >>>          minutes, >>>     >>>          shifted by 15 >>>     >>>          minutes >>>     >>>          (FixedWindows.of(30 >>>     >>>          minutes).withOffset(15 >>>     >>>          minutes)), so >>>     >>>          that the two >>>     >>>          windows >>>     >>>          overlap and >>>     >>>          catch >>>     >>>          duplicates >>>     >>>          that would >>>     >>>          appear near >>>     >>>          boundary of >>>     >>>          the first window. >>>     >>> >>>     >>>          @Max, do you >>>     >>>          think it >>>     >>>          would be >>>     >>>          possible to >>>     >>>          schedule the >>>     >>>          cleanup timer >>>     >>>          only when >>>     >>>          there is >>>     >>>          actually data >>>     >>>          in state for >>>     >>>          given key? >>>     >>>          The timer >>>     >>>          would be >>>     >>>          cleared on >>>     >>>          call to >>>     >>>          `clear()`, >>>     >>>          but would >>>     >>>          have to be >>>     >>>          set on every >>>     >>>          write. Or >>>     >>>          would it make >>>     >>>          sense not to >>>     >>>          schedule the >>>     >>>          cleanup timer >>>     >>>          for global >>>     >>>          window at all? >>>     >>> >>>     >>>          Jan >>>     >>> >>>     >>>          [1] >>>     >>> >>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L334 >>>     >>> >>>     >>>          On 8/15/20 >>>     >>>          5:47 PM, >>>     >>>          Catlyn Kong >>>     >>>          wrote: >>>     >>>>              Hi! >>>     >>>> >>>     >>>>              Thanks for >>>     >>>>              the >>>     >>>>              explanation! >>>     >>>>              The >>>     >>>>              screenshot >>>     >>>>              actually >>>     >>>>              shows all >>>     >>>>              the new >>>     >>>>              instances >>>     >>>>              between >>>     >>>>              marking the >>>     >>>>              heap and >>>     >>>>              taking a >>>     >>>>              heap dump, >>>     >>>>              so sorry if >>>     >>>>              that's a >>>     >>>>              little >>>     >>>>              confusing. >>>     >>>>              Here's what >>>     >>>>              the full >>>     >>>>              heap looks like: >>>     >>>>              Screen Shot >>>     >>>>              2020-08-15 >>>     >>>>              at 8.31.42 >>>     >>>>              AM.png >>>     >>>>              Our input >>>     >>>>              stream has >>>     >>>>              roughly 50 >>>     >>>>              messages per >>>     >>>>              second and >>>     >>>>              the pipeline >>>     >>>>              has been >>>     >>>>              running for >>>     >>>>              about 24 >>>     >>>>              hours. Even >>>     >>>>              assuming all >>>     >>>>              the messages >>>     >>>>              are unique, >>>     >>>>              5.5 million >>>     >>>>              timers is >>>     >>>>              still very >>>     >>>>              surprising. >>>     >>>> >>>     >>>>              We're >>>     >>>>              allocating >>>     >>>>              11G for >>>     >>>>              taskmanager JVM >>>     >>>>              heap, but it >>>     >>>>              eventually >>>     >>>>              gets filled >>>     >>>>              up (after >>>     >>>>              couple days) >>>     >>>>              and the >>>     >>>>              cluster ends >>>     >>>>              up in a bad >>>     >>>>              state. >>>     >>>>              Here's a >>>     >>>>              screenshot >>>     >>>>              of the heap >>>     >>>>              size over >>>     >>>>              the past 24h: >>>     >>>>              Screen Shot >>>     >>>>              2020-08-15 >>>     >>>>              at 8.41.48 >>>     >>>>              AM.png >>>     >>>> >>>     >>>>              Could it be >>>     >>>>              that the >>>     >>>>              timers never >>>     >>>>              got clear >>>     >>>>              out or maybe >>>     >>>>              the pipeline >>>     >>>>              is creating >>>     >>>>              more >>>     >>>>              timer instances >>>     >>>>              than expected? >>>     >>>> >>>     >>>>              On Sat, Aug >>>     >>>>              15, 2020 at >>>     >>>>              4:07 AM >>>     >>>>              Maximilian >>>     >>>>              Michels >>>     >>>>               >>>     >>>>              >> >>>     >>>>              wrote: >>>     >>>> >>>     >>>>                  Awesome! >>>     >>>>                  Thanks a >>>     >>>>                  lot for >>>     >>>>                  the >>>     >>>>                  memory >>>     >>>>                  profile. >>>     >>>>                  Couple >>>     >>>>                  remarks: >>>     >>>> >>>     >>>>                  a) I can >>>     >>>>                  see that >>>     >>>>                  there >>>     >>>>                  are >>>     >>>>                  about >>>     >>>>                  378k >>>     >>>>                  keys and >>>     >>>>                  each of >>>     >>>>                  them >>>     >>>>                  sets a >>>     >>>>                  timer. >>>     >>>>            ��                  b) Based >>>     >>>>                  on the >>>     >>>>                  settings >>>     >>>>                  for >>>     >>>>                  DeduplicatePerKey >>>     >>>>                  you >>>     >>>>                  posted, >>>     >>>>                  you will >>>     >>>>                  keep >>>     >>>>                  track of >>>     >>>>                  all keys >>>     >>>>                  of the >>>     >>>>                  last 30 >>>     >>>>                  minutes. >>>     >>>> >>>     >>>>                  Unless >>>     >>>>                  you have >>>     >>>>                  much >>>     >>>>                  fewer >>>     >>>>                  keys, >>>     >>>>                  the >>>     >>>>                  behavior >>>     >>>>                  is to be >>>     >>>>                  expected. The >>>     >>>> >>>     >>>>                  memory >>>     >>>>                  sizes >>>     >>>>                  for the >>>     >>>>                  timer >>>     >>>>                  maps do >>>     >>>>                  not look >>>     >>>>                  particularly >>>     >>>>                  high >>>     >>>>                  (~12Mb). >>>     >>>> >>>     >>>>                  How much >>>     >>>>                  memory >>>     >>>>                  did you >>>     >>>>                  reserve >>>     >>>>                  for the >>>     >>>>                  task >>>     >>>>                  managers?* >>>     >>>> >>>     >>>>                  -Max >>>     >>>> >>>     >>>>                  *The >>>     >>>>                  image >>>     >>>>                  links >>>     >>>>                  give me >>>     >>>>                  a "504 >>>     >>>>                  error". >>>     >>>> >>>     >>>>                  On >>>     >>>>                  14.08.20 >>>     >>>>                  23:29, >>>     >>>>                  Catlyn >>>     >>>>                  Kong wrote: >>>     >>>>                  > Hi! >>>     >>>>                  > >>>     >>>>                  > We're >>>     >>>>                  indeed >>>     >>>>                  using >>>     >>>>                  the >>>     >>>>                  rocksdb >>>     >>>>                  state >>>     >>>>                  backend, >>>     >>>>                  so that >>>     >>>>                  might be >>>     >>>>                  part of >>>     >>>>                  > the >>>     >>>>                  reason. >>>     >>>>                  Due to >>>     >>>>                  some >>>     >>>>                  security >>>     >>>>                  concerns, we >>>     >>>>                  might >>>     >>>>                  not be >>>     >>>>                  able to >>>     >>>>                  > >>>     >>>>                  provide >>>     >>>>                  the full >>>     >>>>                  heap >>>     >>>>                  dump >>>     >>>>                  since we >>>     >>>>                  have >>>     >>>>                  some >>>     >>>>                  custom >>>     >>>>                  code >>>     >>>>                  path. But >>>     >>>>                  > here's >>>     >>>>                  a >>>     >>>>                  screenshot >>>     >>>>                  from >>>     >>>>                  JProfiler: >>>     >>>>                  > Screen >>>     >>>>                  Shot >>>     >>>>                  2020-08-14 >>>     >>>>                  at >>>     >>>>                  9.10.07 >>>     >>>>                  AM.png >>>     >>>>                  > Looks >>>     >>>>                  like >>>     >>>>                  TimerHeapInternalTimer >>>     >>>>                  (initiated >>>     >>>>                  in >>>     >>>>                  InternalTimerServiceImpl >>>     >>>> >>>     >>>>                  > >>>     >>>> >>>  ) >>>     >>>> >>>     >>>>                  > isn't >>>     >>>>                  getting >>>     >>>>                  garbage >>>     >>>>                  collected? >>>     >>>>                  As David >>>     >>>>                  has >>>     >>>>                  mentioned the >>>     >>>>                  pipeline >>>     >>>>                  > uses >>>     >>>>                  DeduplicatePerKey >>>     >>>> >>>     >>>>                  > >>>     >>>> >>>   in >>>     >>>> >>>     >>>>                  > Beam >>>     >>>>                  2.22, >>>     >>>>                  ProcessConnectionEventFn >>>     >>>>                  is a >>>     >>>>                  simple stateless >>>     >>>>                  DoFn >>>     >>>>                  that just >>>     >>>>                  > does >>>     >>>>                  some >>>     >>>>                  logging >>>     >>>>                  and >>>     >>>>                  emits >>>     >>>>                  the >>>     >>>>                  events. >>>     >>>>                  Is there >>>     >>>>                  any >>>     >>>>                  possibility >>>     >>>>                  that >>>     >>>>                  > the >>>     >>>>                  timer >>>     >>>>                  logic or >>>     >>>>                  the way >>>     >>>>                  it's >>>     >>>>                  used in >>>     >>>>                  the >>>     >>>>                  dedupe >>>     >>>>                  Pardo >>>     >>>>                  can >>>     >>>>                  cause this >>>     >>>>                  > leak? >>>     >>>>                  > >>>     >>>>                  > Thanks, >>>     >>>>                  > Catlyn >>>     >>>>                  > >>>     >>>>                  > On >>>     >>>>                  Tue, Aug >>>     >>>>                  11, 2020 >>>     >>>>                  at 7:58 >>>     >>>>                  AM >>>     >>>>                  Maximilian >>>     >>>>                  Michels >>>     >>>>                   >>>     >>>>  > >>>     >>>> >>>     >>>>                  > >>>     >>>>   >>>     >>>>  >>     >>> >>>     >>>>                  wrote: >>>     >>>>                  > >>>     >>>>                  >     Hi! >>>     >>>>                  > >>>     >>>>                  > >>>     >>>>                   Looks >>>     >>>>                  like a >>>     >>>>                  potential leak, >>>     >>>>                  caused >>>     >>>>                  by your >>>     >>>>                  code or >>>     >>>>                  by Beam >>>     >>>>                  itself. >>>     >>>>                  > >>>     >>>>                   Would >>>     >>>>                  you be >>>     >>>>                  able to >>>     >>>>                  supply a >>>     >>>>                  heap >>>     >>>>                  dump >>>     >>>>                  from one >>>     >>>>                  of the >>>     >>>>                  task >>>     >>>>                  managers? >>>     >>>>                  > >>>     >>>>                   That >>>     >>>>                  would >>>     >>>>                  greatly >>>     >>>>                  help >>>     >>>>                  debugging this >>>     >>>>                  issue. >>>     >>>>                  > >>>     >>>>                  >     -Max >>>     >>>>                  > >>>     >>>>                  >     On >>>     >>>>                  07.08.20 >>>     >>>>                  00:19, >>>     >>>>                  David >>>     >>>>                  Gogokhiya wrote: >>>     >>>>                  >      > Hi, >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  We >>>     >>>>                  recently >>>     >>>>                  started >>>     >>>>                  using >>>     >>>>                  Apache >>>     >>>>                  Beam >>>     >>>>                  version >>>     >>>>                  2.20.0 >>>     >>>>                  running on >>>     >>>>                  >     Flink >>>     >>>>                  >      > >>>     >>>>                  version >>>     >>>>                  1.9 >>>     >>>>                  deployed >>>     >>>>                  on >>>     >>>>                  kubernetes >>>     >>>>                  to >>>     >>>>                  process >>>     >>>>                  unbounded streams >>>     >>>>                  >     of >>>     >>>>                  data. >>>     >>>>                  >      > >>>     >>>>                  However, >>>     >>>>                  we >>>     >>>>                  noticed >>>     >>>>                  that the >>>     >>>>                  memory >>>     >>>>                  consumed >>>     >>>>                  by >>>     >>>>                  stateful >>>     >>>>                  Beam is >>>     >>>>                  >      > >>>     >>>>                  steadily >>>     >>>>                  increasing >>>     >>>>                  over >>>     >>>>                  time >>>     >>>>                  with no >>>     >>>>                  drops no >>>     >>>>                  matter >>>     >>>>                  what the >>>     >>>>                  >  current >>>     >>>>                  >      > >>>     >>>>                  bandwidth is. >>>     >>>>                  We were >>>     >>>>                  wondering if >>>     >>>>                  this is >>>     >>>>                  expected >>>     >>>>                  and if >>>     >>>>                  not what >>>     >>>>                  >      > >>>     >>>>                  would be >>>     >>>>                  the best >>>     >>>>                  way to >>>     >>>>                  resolve it. >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  > > >>>     >>>>                   More >>>     >>>>                  Context >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  We have >>>     >>>>                  the >>>     >>>>                  following pipeline >>>     >>>>                  that >>>     >>>>                  consumes >>>     >>>>                  messages >>>     >>>>                  from the >>>     >>>>                  >  unbounded >>>     >>>>                  >      > >>>     >>>>                  stream >>>     >>>>                  of data. >>>     >>>>                  Later we >>>     >>>>                  deduplicate >>>     >>>>                  the >>>     >>>>                  messages >>>     >>>>                  based on >>>     >>>>                  unique >>>     >>>>                  >      > >>>     >>>>                  message >>>     >>>>                  id using >>>     >>>>                  the >>>     >>>>                  deduplicate >>>     >>>>                  function >>>     >>>>                  >      > >>>     >>>>                  > >>>     >>>> >>>   . >>>     >>>>                  > >>>     >>>>                  >      > >>>     >>>>                  Since we >>>     >>>>                  are >>>     >>>>                  using >>>     >>>>                  Beam >>>     >>>>                  version >>>     >>>>                  2.20.0, >>>     >>>>                  we >>>     >>>>                  copied >>>     >>>>                  the >>>     >>>>                  source code >>>     >>>>                  >     of the >>>     >>>>                  >      > >>>     >>>>                  deduplicate >>>     >>>>                  function >>>     >>>>                  >      > >>>     >>>>                  > >>>     >>>> >>>   from >>>     >>>>                  > >>>     >>>>                  >      > >>>     >>>>                  version >>>     >>>>                  2.22.0. >>>     >>>>                  After >>>     >>>>                  that we >>>     >>>>                  unmap >>>     >>>>                  the >>>     >>>>                  tuple, >>>     >>>>                  retrieve the >>>     >>>>                  >  necessary >>>     >>>>                  >      > >>>     >>>>                  data >>>     >>>>                  from >>>     >>>>                  message >>>     >>>>                  payload >>>     >>>>                  and dump >>>     >>>>                  the >>>     >>>>                  corresponding >>>     >>>>                  data into >>>     >>>>                  > >>>     >>>>                   the log. >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  Pipeline: >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  Flink >>>     >>>>                  configuration: >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  As we >>>     >>>>                  mentioned before, >>>     >>>>                  we >>>     >>>>                  noticed >>>     >>>>                  that the >>>     >>>>                  memory >>>     >>>>                  usage of the >>>     >>>>                  >      > >>>     >>>>                  jobmanager >>>     >>>>                  and >>>     >>>>                  taskmanager >>>     >>>>                  pod are >>>     >>>>                  steadily >>>     >>>>                  increasing >>>     >>>>                  with no >>>     >>>>                  > >>>     >>>>                   drops no >>>     >>>>                  >      > >>>     >>>>                  matter >>>     >>>>                  what the >>>     >>>>                  current >>>     >>>>                  bandwidth is. >>>     >>>>                  We tried >>>     >>>>                  allocating >>>     >>>>                  more >>>     >>>>                  >  memory >>>     >>>>                  >      > >>>     >>>>                  but it >>>     >>>>                  seems >>>     >>>>                  like no >>>     >>>>                  matter >>>     >>>>                  how much >>>     >>>>                  memory >>>     >>>>                  we >>>     >>>>                  allocate it >>>     >>>>                  > >>>     >>>>                   eventually >>>     >>>>                  >      > >>>     >>>>                  reaches >>>     >>>>                  its >>>     >>>>                  limit >>>     >>>>                  and then >>>     >>>>                  it tries >>>     >>>>                  to >>>     >>>>                  restart >>>     >>>>                  itself. >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  Sincerely, >>>     >>>>                  David >>>     >>>>                  >      > >>>     >>>>                  >      > >>>     >>>>                  > >>>     >>>> >>>