Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8A101200C61 for ; Tue, 25 Apr 2017 17:42:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 88AB6160BB3; Tue, 25 Apr 2017 15:42:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D06F8160B9E for ; Tue, 25 Apr 2017 17:42:49 +0200 (CEST) Received: (qmail 5203 invoked by uid 500); 25 Apr 2017 15:42:47 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 5186 invoked by uid 99); 25 Apr 2017 15:42:47 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Apr 2017 15:42:47 +0000 Received: from mail-oi0-f46.google.com (mail-oi0-f46.google.com [209.85.218.46]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 9E3EA1A0A97 for ; Tue, 25 Apr 2017 15:42:47 +0000 (UTC) Received: by mail-oi0-f46.google.com with SMTP id x184so177201282oia.1 for ; Tue, 25 Apr 2017 08:42:47 -0700 (PDT) X-Gm-Message-State: AN3rC/6d77+2qF+cHXpAARl+Gxkz0nlzspL8lldXSHvzfDcBAsY65AvU KEUouZls7lTc1mIZ3NVeuGnX7Uv3eg== X-Received: by 10.157.8.137 with SMTP id 9mr1930476otf.84.1493134967000; Tue, 25 Apr 2017 08:42:47 -0700 (PDT) MIME-Version: 1.0 Received: by 10.182.232.199 with HTTP; Tue, 25 Apr 2017 08:42:46 -0700 (PDT) In-Reply-To: References: From: Thomas Weise Date: Tue, 25 Apr 2017 08:42:46 -0700 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Specialized operator recovery To: dev@apex.apache.org Content-Type: multipart/alternative; boundary=001a113730084f7f88054dff93fe archived-at: Tue, 25 Apr 2017 15:42:50 -0000 --001a113730084f7f88054dff93fe Content-Type: text/plain; charset=UTF-8 Pramod, Sounds like some sort of alternative "processing mode" that from engine perspective allows potentially inconsistent state when there is a pipeline failure. This is of course only something the user can decide. Does the proposal assume that the operator state is immutable (or what is sometimes tagged with the stateless annotation)? For example an operator that has to load a large amount of state from another source before it can process the first tuple? Also, it would be an optimization but not something that will help with SLA if the operator still needs to be recovered when its own container fails. It might help to clarify that and also why there is a need to recover in the batch use case (vs. reprocess). Thanks, Thomas On Mon, Apr 24, 2017 at 8:57 AM, Pramod Immaneni wrote: > In a failure scenario, when a container fails, it is redeployed along with > all the operators in it. The operators downstream to these operators are > also redeployed within their containers. The operators are restored from > their checkpoint and connect to the appropriate point in the stream > according to the processing mode. In at least once mode, for example, the > data is replayed from the same checkpoint > > Restoring an operator state from checkpoint could turn out to be a costly > operation depending on the size of the state. In some use cases, based on > the operator logic, when there is an upstream failure, the operator state > without being restored to the checkpoint i.e., remaining as is, will still > produce the same results with the data replayed from the last fully > processed window. This is true with some operators in batch use cases. The > operator state can remain the same as it was before the upstream failure by > reusing the same operator instance from before and only the streams and > window reset to the window after the last fully processed window to > guarantee the at least once processing of tuples. If the container where > the operator itself is running goes down, it would need to be restored from > the checkpoint of course. > > I would like to propose adding the ability for a user to explicitly > identify operators to be of this type and the corresponding functionality > in the engine to handle their recovery in the way described above by not > restoring their state from checkpoint, reusing the instance and restoring > the stream to the window after the last fully processed window for the > operator. When operators are not identified to be of this type, the default > behavior is what it is today and nothing changes. > > I have done some prototyping on the engine side to ensure that this is > possible with our current code base without requiring a massive overhaul, > especially the restoration of the operator instance within the Node in the > streaming container, the re-establishment of the subscriber stream to a > window in the buffer server where the publisher (upstream) hasn't yet > reached as it would be restarting from checkpoint and have been able to get > it all working successfully. > > Thanks > --001a113730084f7f88054dff93fe--