Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 231B5200CC6 for ; Mon, 3 Jul 2017 16:52:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 21149160BE4; Mon, 3 Jul 2017 14:52:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6B257160BEC for ; Mon, 3 Jul 2017 16:52:06 +0200 (CEST) Received: (qmail 48169 invoked by uid 500); 3 Jul 2017 14:52:05 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 48047 invoked by uid 99); 3 Jul 2017 14:52:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Jul 2017 14:52:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 3D0BBC023D for ; Mon, 3 Jul 2017 14:52:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id wd-ERJPMXBUL for ; Mon, 3 Jul 2017 14:52:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 877C05F341 for ; Mon, 3 Jul 2017 14:52:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id D2166E0D95 for ; Mon, 3 Jul 2017 14:52:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 38B1724605 for ; Mon, 3 Jul 2017 14:52:00 +0000 (UTC) Date: Mon, 3 Jul 2017 14:52:00 +0000 (UTC) From: =?utf-8?Q?Jean-Baptiste_Onofr=C3=A9_=28JIRA=29?= To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (BEAM-2359) SparkTimerInternals inputWatermarkTime does not get updated in cluster mode MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 03 Jul 2017 14:52:07 -0000 [ https://issues.apache.org/jira/browse/BEAM-2359?page=3Dcom.atlassian.= jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16072= 557#comment-16072557 ]=20 Jean-Baptiste Onofr=C3=A9 commented on BEAM-2359: -------------------------------------------- Any update on this Jira for 2.1.0 release ? > SparkTimerInternals inputWatermarkTime does not get updated in cluster mo= de > -------------------------------------------------------------------------= -- > > Key: BEAM-2359 > URL: https://issues.apache.org/jira/browse/BEAM-2359 > Project: Beam > Issue Type: Bug > Components: runner-spark > Reporter: Aviem Zur > Assignee: Aviem Zur > Fix For: 2.1.0 > > > {{SparkTimerInternals#inputWatermarkTime}} does not get updated in cluste= r mode. > This causes windows to not get closed and state to increase forever in me= mory and processing time to increase leading to eventual application crash = (also, triggers based on the watermark do not fire). > The root cause is=20 > a call from within the {{updateStateByKey}} operation in [SparkGroupAlsoB= yWindowViaWindowSet|https://github.com/apache/beam/blob/master/runners/spar= k/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWind= owViaWindowSet.java#L241-L242] which tries to access a static reference to = a {{GlobalWatermarkHolder}} broadcast variable, however, in cluster mode th= is static reference would be a different one in the executor's JVM and is n= ull (this works in local mode since the executor and driver are on the same= JVM). > Alternative Solutions (And viability of solution): > * -Broadcast variable passed to the {{updateStateByKey}} operator- - Not = viable since even if we use the broadcast correctly, broadcast variables ca= n't be used in this case (from within {{updateStateByKey}}) since {{update= StateByKey}} is a {{DStream}} operator and not an {{RDD}} operator so it wi= ll not be updated every micro-batch but rather will retain the same initial= value. > * -Broadcast variable to update the data in an additional transform- - Cr= eate an additional transform on the {{DStream}}'s RDDs prior to the {{DStre= am}} operator {{updateStateByKey}} and use a broadcast which will be update= d (since this is an {{RDD}} operator), and add this value to the keyed datu= m itself so it will be available in the {{DStream}} operator {{updateStateB= yKey}}. Not viable since this will only update keys which have had new data= appear in the microbatch, however we also want to update the watermark val= ue for keys which did not have new data appear in the microbatch. > * -Broadcast variable to update a static reference- - Create an additiona= l transform on the {{DStream}}'s RDDs prior to the {{DStream}} operator {{u= pdateStateByKey}} and use a broadcast which will be updated (since this is = an {{RDD}} operator), and set this value in a static reference within the e= xecutor. Not viable since we cannot ensure that all executors will receive = partitions to process in each microbatch. > * Server to be polled lazily every microbatch from within the {{updateSta= teByKey}} operator - Spin a server on some configured port on the driver wh= ich will serve the current watermarks upon request. Lazily poll this value = every microbatch from within the {{updateStateByKey}} operator and update a= static reference within the executor. Viable, however does not use Spark n= ative operations and incurs code maintenance for this and operational cost = for the user (open ports in firewalls, etc.). > * Drop/register watermarks as a block in BlockManager and request remote = version from within the {{updateStateByKey}} operator - Update watermarks a= s a block in the BlockManager on the driver by dropping and reregistering t= he block every microbatch. Lazily poll this value every microbatch from wit= hin the {{updateStateByKey}} operator and update a static reference within = the executor. Viable, less "ugly" than the server version and requires less= operational cost. -- This message was sent by Atlassian JIRA (v6.4.14#64029)