Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8184D200C4C for ; Tue, 4 Apr 2017 11:03:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7E60A160B90; Tue, 4 Apr 2017 09:03:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C5583160B81 for ; Tue, 4 Apr 2017 11:03:27 +0200 (CEST) Received: (qmail 74676 invoked by uid 500); 4 Apr 2017 09:03:25 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 74667 invoked by uid 99); 4 Apr 2017 09:03:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 09:03:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 2495EC0647 for ; Tue, 4 Apr 2017 09:03:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id pyi3si5PDBVt for ; Tue, 4 Apr 2017 09:03:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 2A0FE5FCB9 for ; Tue, 4 Apr 2017 09:03:22 +0000 (UTC) Received: (qmail 74634 invoked by uid 99); 4 Apr 2017 09:03:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 09:03:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27C61DFBA9; Tue, 4 Apr 2017 09:03:22 +0000 (UTC) From: aljoscha To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #3508: [FLINK-5991] [state-backend, streaming] Expose Bro... Content-Type: text/plain Message-Id: <20170404090322.27C61DFBA9@git1-us-west.apache.org> Date: Tue, 4 Apr 2017 09:03:22 +0000 (UTC) archived-at: Tue, 04 Apr 2017 09:03:28 -0000 Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3508#discussion_r109610444 --- Diff: docs/dev/stream/state.md --- @@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream ## Using Managed Operator State -A stateful function can implement either the more general `CheckpointedFunction` +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed` interface. -In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - -##### ListCheckpointed +#### CheckpointedFunction -The `ListCheckpointed` interface requires the implementation of two methods: - -{% highlight java %} -List snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List state) throws Exception; -{% endhighlight %} - -On `snapshotState()` the operator should return a list of objects to checkpoint and -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. - -##### CheckpointedFunction - -The `CheckpointedFunction` interface also requires the implementation of two methods: +The `CheckpointedFunction` interface provides access to non-keyed state with different +redistribution schemes. It requires the implementation of two methods: {% highlight java %} void snapshotState(FunctionSnapshotContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception; {% endhighlight %} -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, +is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included. -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that -uses state to buffer elements before sending them to the outside world: +Currently, list-style managed operator state is supported. The state +is expected to be a `List` of *serializable* objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be redistributed. Depending on the state accessing method, +the following redistribution schemes are defined: + + - **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of --- End diff -- Could use "Round-Robin redistribution". Maybe... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---