Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C9E5B200C2B for ; Thu, 2 Mar 2017 11:11:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C8750160B6F; Thu, 2 Mar 2017 10:11:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EC9A6160B61 for ; Thu, 2 Mar 2017 11:11:06 +0100 (CET) Received: (qmail 47002 invoked by uid 500); 2 Mar 2017 10:11:06 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 46984 invoked by uid 99); 2 Mar 2017 10:11:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Mar 2017 10:11:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 600571A001C for ; Thu, 2 Mar 2017 10:11:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.49 X-Spam-Level: ** X-Spam-Status: No, score=2.49 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, T_KAM_HTML_FONT_INVALID=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id XIlolsW4fiBH for ; Thu, 2 Mar 2017 10:11:02 +0000 (UTC) Received: from mail-ua0-f182.google.com (mail-ua0-f182.google.com [209.85.217.182]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id E22F75FB61 for ; Thu, 2 Mar 2017 10:11:01 +0000 (UTC) Received: by mail-ua0-f182.google.com with SMTP id q7so39088619uaf.2 for ; Thu, 02 Mar 2017 02:11:01 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=Q/xj1QyUSOCx61Q1bkwHiSDet4sytNVrKI0ZNgMLi7I=; b=uyGp7yTwJiobNUll/QEx4UpxBswSpES0BMvAmexUUIdi4rld+ZdQqraTJIvAB6onVQ QxbmNOsZ7QlpVn0xZT33J8re0C3FDykOsE2eJInJ9kuBJn0rXO2lJsqeRVf/av6Bx2ad vH2Q3dMihp0kWF0Hs+ZvKFiI+ESZp0+xqOU9PxMu/GM5vdbxNlOc8m00EE9bOrF9I8qZ jEXqwZP/ms1PlDBDERspW/C/Fpybn5UlI2FRndtt8cjPVUWjkod0lal6NycECKA2nxSC /5nVizEq58M4pqi2muGdqLZqki/g/bPuUSG3NLpzkiYSkH5ekhMwDR4sxBZqRmCkM+w2 Hofg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=Q/xj1QyUSOCx61Q1bkwHiSDet4sytNVrKI0ZNgMLi7I=; b=twbXHMpXBAi0WpJgWY5uegpXomCqE/A/XxVm2fmc57XsKX7lmIA+s/hEsAKTBkl7Qh eIS1bASUHskDsivoYCTkYb3ieA1rynaCB0cI1zu2L4UJ6kd1M/Lz7DGuaRhXa7ndT1F9 HSId0F75qFPSeyw/4dj19OdBSzFQpdoMVaSMfJpgbeiTIyyyAILwfUMeI9ErPYw1ED2N 29T2IH0BEpC2rvoPzPfStNJULO7ndEraYCdZ2KmuVni1xCIRXPL++UA9nNaGO28Pqd/F 7mhPH33YOhEXw3fszRmM+t9/vBjEGyFjl1FvoCwHP0ufoX8DZSQdTQ/4+kopJksabn0q 7nzw== X-Gm-Message-State: AMke39k52opKuPhrBbSX8Uegs2ql95O/qIXVV/sEYgZkte3llWhXGJwyZC3TYINYOPDQkFIaGeUxhJAUNufk0eqQ X-Received: by 10.31.51.68 with SMTP id z65mr2653850vkz.40.1488449451528; Thu, 02 Mar 2017 02:10:51 -0800 (PST) MIME-Version: 1.0 Received: by 10.103.37.197 with HTTP; Thu, 2 Mar 2017 02:10:31 -0800 (PST) In-Reply-To: References: From: Bhupesh Chawda Date: Thu, 2 Mar 2017 15:40:31 +0530 Message-ID: Subject: Re: APEXCORE-619 Recovery windowId in future during application relaunch. To: dev Content-Type: multipart/alternative; boundary=001a1144a44ed32cb70549bca495 archived-at: Thu, 02 Mar 2017 10:11:08 -0000 --001a1144a44ed32cb70549bca495 Content-Type: text/plain; charset=UTF-8 What if all operators complete first checkpoints but the stateless operator could not cross the first checkpoint window, and the DAG crashed. If we try to figure out the recovery checkpoint now, we might conclude that checkpoint 1 is the point to start and we may miss some data getting processed by the stateless operator. Probably in this case at-least once is also not guaranteed? ~ Bhupesh _______________________________________________________ Bhupesh Chawda E: bhupesh@datatorrent.com | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise wrote: > Dummy checkpoints, continuously writing committed window id and the like > all introduce overhead that is probably not needed. > > All the information to derive what we need is likely available and IMO the > discussion should be on what is the correct way of using it. I will have a > look when I get to it as well. > > Thanks, > Thomas > > > On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde > wrote: > > > Instead of treating the stateless operator in a special way and missing > > corner cases, just have a dummy checkpoint, then there is no need to > handle > > corner cases. > > > > There is a name for this solution, > > https://en.wikipedia.org/wiki/Null_Object_pattern > > > > > > > > On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni > > wrote: > > > > > There is code in various places that deals with stateless operators in > a > > > special way even though a physical checkpoint does not exist on the > disk. > > > It is probably a matter of applying similar thought process/logic > > correctly > > > here. > > > > > > On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre > wrote: > > > > > > > hmm! the fact that commitWindowId has moved up (right now in memory > of > > > > Stram) should mean that a complete set of checkpoints are available, > > i.e > > > > commitWindowId can be derived. Lets say that next checkpoint window > > also > > > > gets checkpointed across the app, commitwindowID is in memory but not > > > > written to stram-state yet, then upon relaunch the latest > > commitwindowID > > > > should get computed correctly. > > > > > > > > This may be just about setting stateless operators to commitWindowid > on > > > > re-launch? aka bug/feature? > > > > > > > > Thks > > > > Amol > > > > > > > > > > > > > > > > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | > Twitter: > > > @*amolhkekre* > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > *Join us at Apex Big Data World-San Jose > > > > , April 4, 2017!* > > > > [image: http://www.apexbigdata.com/san-jose-register.html] > > > > > > > > > > > > On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni < > > pramod@datatorrent.com> > > > > wrote: > > > > > > > > > Do we need to save committedWindowId? Can't it be computed from > > > existing > > > > > checkpoints by walking through the DAG. We probably do this anyway > > and > > > I > > > > > suspect there is a minor bug somewhere in there. If an operator is > > > > > stateless you could assume checkpoint as long max for sake of > > > computation > > > > > and compute the committed window to be the lowest common > checkpoint. > > If > > > > > they are all stateless and you end up with long max you can start > > with > > > > > window id that reflects the current timestamp. > > > > > > > > > > Thanks > > > > > > > > > > On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre > > > wrote: > > > > > > > > > > > CommitWindowId could be computed from the existing checkpoints. > > That > > > > > > solution still needs purge to be done after commitWindowId is > > > confirmed > > > > > to > > > > > > be saved in Stram state. Without ths the commitWindowId computed > > from > > > > the > > > > > > checkpoints may have some checkpoints missing. > > > > > > > > > > > > Thks > > > > > > Amol > > > > > > > > > > > > > > > > > > > > > > > > E:amol@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | > > > Twitter: @*amolhkekre* > > > > > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > *Join us at Apex Big Data World-San Jose > > > > > > , April 4, 2017!* > > > > > > [image: http://www.apexbigdata.com/san-jose-register.html] > > > > > > > > > > > > > > > > > > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni < > > > > pramod@datatorrent.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Can't the commitedWindowId be calculated by looking at the > > physical > > > > > plan > > > > > > > and the existing checkpoints? > > > > > > > > > > > > > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi < > tushar@apache.org > > > > > > > > wrote: > > > > > > > > > > > > > > > Help Needed for APEXCORE-619 > > > > > > > > > > > > > > > > Issue : When application is relaunched after long time with > > > > stateless > > > > > > > > opeartors at the end of the DAG, the stateless operators > starts > > > > with > > > > > a > > > > > > > very > > > > > > > > high windowId. In this case the stateless operator ignors all > > the > > > > > data > > > > > > > > received till upstream operator catches up with it. This > breaks > > > the > > > > > > > > *at-least-once* gaurantee while relaunch of the opeartor or > > when > > > > > master > > > > > > > is > > > > > > > > killed and application is restarted. > > > > > > > > > > > > > > > > Solutions: > > > > > > > > - Fix windowId for stateless leaf operators from upstream > > > opeartor. > > > > > But > > > > > > > it > > > > > > > > has some issues when we have a join with two upstrams > operators > > > at > > > > > > > > different windowId. If we set the windowID to min(upstream > > > > windowId), > > > > > > > then > > > > > > > > we need to again recalulate the new recovery window ids for > > > > upstream > > > > > > > paths > > > > > > > > from this operators. > > > > > > > > > > > > > > > > - Other solution is to create a empty file in checkpoint > > > directory > > > > > for > > > > > > > > stateless operators. This will help us to identify the > > > checkpoints > > > > of > > > > > > > > stateless operators during relaunch instead of computing from > > > > latest > > > > > > > > timestamp. > > > > > > > > > > > > > > > > - Bring the entire DAG to committedWindowId. This could be > > > achived > > > > > > using > > > > > > > > writing committedWindowId in a journal. we need to make sure > > that > > > > we > > > > > > are > > > > > > > > not puring the checkpointed state until the committedWundowId > > is > > > > > saved > > > > > > in > > > > > > > > journal. > > > > > > > > > > > > > > > > Let me know your thoughs on this and preferred solution. > > > > > > > > > > > > > > > > Regards, > > > > > > > > -Tushar. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > *Join us at Apex Big Data World-San Jose > > , April 4, 2017!* > > [image: http://www.apexbigdata.com/san-jose-register.html] > > > --001a1144a44ed32cb70549bca495--