Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3636B200CF0 for ; Thu, 7 Sep 2017 09:44:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 34C55161536; Thu, 7 Sep 2017 07:44:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 54924160EC9 for ; Thu, 7 Sep 2017 09:44:39 +0200 (CEST) Received: (qmail 72436 invoked by uid 500); 7 Sep 2017 07:44:38 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 72426 invoked by uid 99); 7 Sep 2017 07:44:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Sep 2017 07:44:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 5C53418C5E5 for ; Thu, 7 Sep 2017 07:44:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.693 X-Spam-Level: *** X-Spam-Status: No, score=3.693 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 6zYvj0-8b_1U for ; Thu, 7 Sep 2017 07:44:35 +0000 (UTC) Received: from mail-ua0-f180.google.com (mail-ua0-f180.google.com [209.85.217.180]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id D328F5F567 for ; Thu, 7 Sep 2017 07:44:34 +0000 (UTC) Received: by mail-ua0-f180.google.com with SMTP id l24so17812002uaa.5 for ; Thu, 07 Sep 2017 00:44:34 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=4Z/b9BlzD8UZiwzzfhZa7SrCLcysHkuhCwEHvUfdGso=; b=eC1u5XCqR/HYUtrgTRbGTEYjwJf80M4oe1ZfOwQhWjPH3CxtVfiBH9GUjx/7bv3uc5 HnREZOPnEVeEQtKtcb22EGyAo970XaLBs/149fOYwAsoSUFM7ldb3QpgiTx5LBHt9CSL LuNj8keYu/aPN8Dq3EydamDcKvUD/wnEw/fR8F2M7zVmv0x9RIpGQhcVqtx3QoRKNFeR JsmFlAflrSLAACbTFTCMY6anyeCmCKjijFhEJdbxI9mFIzwradbWLK6vrmduCu+WcdRG EcKjtSBP5E3j+5Rdl1I+rKQPoYCDQ1zjJZNuZn4tw5fJR3a8AC7UjCwsKjL9VeKbkUsA gK8A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=4Z/b9BlzD8UZiwzzfhZa7SrCLcysHkuhCwEHvUfdGso=; b=OGJ/nti4JoaaLqj6wPXSvxWAqn3gOanfDzaIpwxAzTN35XIumB2itF3P/P1RePfsDg BNqEZoSgxDfYurrnENphcAz/8qV8h88CVt9bUP/428qc24NpNMqV/meDHZuFHqp/n3lY 3f2D3KYB/p8Gn6WGEj76wIrwo020tm55CNtyePeWpARqI4S4FwsrRcDhferOT1ueAigY koHDFMi54xa1janQDuVSFzRxlp9/iWiuSoUl2MLsJdDzc1CQkhqShCs3RyfOxQN/X1sr V4IXut8Gn3gwKakcgZI+wsRrqaTlmOs19hFdXfubk5+8FkH+VG/F7QXs/fZI8DighgN+ 68RA== X-Gm-Message-State: AHPjjUhPMsxAhSnHXfeCH78KhbaYOns9zGN5Oz9UwPOZoYiGAPaicusV pgDan5a0KS1dlKD3YUppgxp0S19jHQ== X-Google-Smtp-Source: ADKCNb6vOrvsoCxH6JJSOT8KbMVadWrOPcNdg7S9kGjxwk85Pz1ox15GmNCs+kAqh9PNQrngy9CEgujgIQ1oBnjEoR8= X-Received: by 10.176.17.88 with SMTP id g24mr1195655uac.26.1504770273508; Thu, 07 Sep 2017 00:44:33 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.56.201 with HTTP; Thu, 7 Sep 2017 00:44:03 -0700 (PDT) In-Reply-To: References: <1504614995389-0.post@n4.nabble.com> From: Fabian Hueske Date: Thu, 7 Sep 2017 09:44:03 +0200 Message-ID: Subject: Re: State Maintenance To: Navneeth Krishnan Cc: "Tzu-Li (Gordon) Tai" , user Content-Type: multipart/alternative; boundary="f4030435b5609f03c9055894a17c" archived-at: Thu, 07 Sep 2017 07:44:40 -0000 --f4030435b5609f03c9055894a17c Content-Type: text/plain; charset="UTF-8" Hi Navneeth, there's a lower level state interface that should address your requirements: OperatorStateStore.getUnionListState() This union list state is similar to the regular operator list state, but instead of splitting the list for recovery and giving out splits to operator instance, it restores the complete list on each operator instance. So it basically does a broadcast restore. If all operator have the same state, only one instance checkpoints its state and this state is restored to all other instances in case of a failure. This should also work with rescaling. The operator instance to checkpoint can be identified by (RuntimeContext.getIndexOfThisSubtask == 0). The OperatorStateStore is a bit hidden. You have to implement the CheckpointedFunction interface. When CheckpointedFunction.initializeState(FunctionInitializationContext context) is called context has a method getOperatorStateStore(). I'd recommend to have a look at the detailed JavaDocs of all involved classes and methods. Hope this helps, Fabian 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan : > Thanks Gordon for your response. I have around 80 parallel flatmap > operator instances and each instance requires 3 states. Out of which one is > user state in which each operator will have unique user's data and I need > this data to be queryable. The other two states are kind of static states > which are only modified when there an update message in config stream. This > static data could easily be around 2GB and in my previous approach I used > operator state where the data is retrieved inside open method across all > operator instances whereas checkpointed only inside one of the operator > instance. > > One of the issue that I have is if I change the operator parallelism how > would it affect the internal state? > > > On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai > wrote: > >> Hi Navneeth, >> >> Answering your three questions separately: >> >> 1. Yes. Your MapState will be backed by RocksDB, so when removing an entry >> from the map state, the state will be removed from the local RocksDB as >> well. >> >> 2. If state classes are not POJOs, they will be serialized by Kryo, >> unless a >> custom serializer is specifically specified otherwise. You can take a look >> at this document on how to do that [1]. >> >> 3. I might need to know more information to be able to suggest properly >> for >> this one. How are you using the "huge state values"? From what you >> described, it seems like you only need it on one of the parallel >> instances, >> so I'm a bit curious on what they are actually used for. Are they needed >> when processing your records? >> >> Cheers, >> Gordon >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >> dev/stream/state.html#custom-serialization-for-managed-state >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > > --f4030435b5609f03c9055894a17c Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Navneeth,

there's a lower level state interface that should address your requi= rements: OperatorStateStore.getUnionListState()

This uni= on list state is similar to the regular operator list state, but instead of= splitting the list for recovery and giving out splits to operator instance= , it restores the complete list on each operator instance.
So it b= asically does a broadcast restore. If all operator have the same state, onl= y one instance checkpoints its state and this state is restored to all othe= r instances in case of a failure. This should also work with rescaling.
=
The operator instance to checkpoint can be identified by (RuntimeCont= ext.getIndexOfThisSubtask =3D=3D 0).

The OperatorStateStore is a bit= hidden. You have to implement the CheckpointedFunction interface. When Che= ckpointedFunction.initializeState(FunctionInitializationContext context) is= called context has a method getOperatorStateStore().

I'd recommend to have a look at the detailed JavaDocs of all in= volved classes and methods.

Hope this helps,
Fabian


2017-09-05 19:35 GMT+02:00 Navneeth = Krishnan <reachnavneeth2@gmail.com>:
Thanks Gordon for your response. I have = around 80 parallel flatmap operator instances and each instance requires 3 = states. Out of which one is user state in which each operator will have uni= que user's data and I need this data to be queryable. The other two sta= tes are kind of static states which are only modified when there an update = message in config stream. This static data could easily be around 2GB and i= n my previous approach I used operator state where the data is retrieved in= side open method across all operator instances whereas checkpointed only in= side one of the operator instance.=C2=A0

One of the issu= e that I have is if I change the operator parallelism how would it affect t= he internal state?


On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai &= lt;tzulitai@apache= .org> wrote:
Hi Navneeth,
Answering your three questions separately:

1. Yes. Your MapState will be backed by RocksDB, so when removing an entry<= br> from the map state, the state will be removed from the local RocksDB as
well.

2. If state classes are not POJOs, they will be serialized by Kryo, unless = a
custom serializer is specifically specified otherwise. You can take a look<= br> at this document on how to do that [1].

3. I might need to know more information to be able to suggest properly for=
this one. How are you using the "huge state values"? From what yo= u
described, it seems like you only need it on one of the parallel instances,=
so I'm a bit curious on what they are actually used for. Are they neede= d
when processing your records?

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-re= lease-1.3/dev/stream/state.html#custom-serialization-for-managed-= state



--
Sent from: http://apache-flink-u= ser-mailing-list-archive.2336050.n4.nabble.com/


--f4030435b5609f03c9055894a17c--