Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E95A7200CF0 for ; Thu, 7 Sep 2017 17:01:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E90C61610D8; Thu, 7 Sep 2017 15:01:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E1A34160D30 for ; Thu, 7 Sep 2017 17:01:39 +0200 (CEST) Received: (qmail 15614 invoked by uid 500); 7 Sep 2017 15:01:38 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 15604 invoked by uid 99); 7 Sep 2017 15:01:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Sep 2017 15:01:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D4D0F1A700D for ; Thu, 7 Sep 2017 15:01:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.943 X-Spam-Level: *** X-Spam-Status: No, score=3.943 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id EWoVqxIVZAT2 for ; Thu, 7 Sep 2017 15:01:32 +0000 (UTC) Received: from mail-qk0-f177.google.com (mail-qk0-f177.google.com [209.85.220.177]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 5DD3261267 for ; Thu, 7 Sep 2017 15:01:32 +0000 (UTC) Received: by mail-qk0-f177.google.com with SMTP id a128so27494242qkc.5 for ; Thu, 07 Sep 2017 08:01:32 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=9EaIw+WQ9kIaTWqbtdhi2Sb8N+/9fe78+7ChtvpPOdE=; b=oB6hMu/CfRYJPGdBZSK7W50kZbgzTM5+UKOLDVSnCEctrl1wIq+adZmT3RKRwWjzHv QJ9uM0diVDyPWfi7xsnBhR4NrHNqW+jJTUXIr/8361bmyMb3RvlxtZfrnM/LpHTaTmH7 kvn8uDnBx9fAFxiUztvgSYzspQKRiNTlmHj4zqAnN+0rrEee6IdGIKRKzFZIKw483TaX HLp0ENtqV4Ulwr+auV0p+mgkJ+JWQ8wxYS70lWE7jcVysri+J+5Gf4EBXOQFKj9StwFt pSwk5AUpK5Y+I6VwD++cFn21DgXhJQcUXXbykb+af+tjJpPYZW5K0ZgzC5tGtk7xPcbf jujw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=9EaIw+WQ9kIaTWqbtdhi2Sb8N+/9fe78+7ChtvpPOdE=; b=Xs/+qp64cYR65oJ5uxfxD0WzHNC8ZpO6R1EJCXLLHtU136SJXkcvqT5SjzDyNZe7ar jhsTLv39yWNp/IDMJfYJR0DomBLucSWBlPwsTbeENTV42Nn7BawY0a3hYR8sGi6vz1Hi UtW4Fs5DRGnBAdZm1TtxEIIX0VSQ8hu4Q0BOcLxiAjKNPeA+WM7GiC4WcrZYwsE+5GQE 9kFdIs43aO7JFRwwVAiHfasndrxh/8Iyizc3SDh+m07XpdKOlpXK2O48z3oNmMUhC6P+ UnNP1x2vtbyWY6mLASOnU4mpQGBDBW649uJJo3LemC21iG62g5nKaRqfraCheXqk1D8m DC0Q== X-Gm-Message-State: AHPjjUhW5+ZnKePvt1/yPPFw6pIy56lfOpks8jTdt16kr7qgLpOMeGdB OWtvszUnKML+OiAx+cPDLGHhpWoWJQ== X-Google-Smtp-Source: AOwi7QBW+spfG4nD8jiy5AIXwkRfysUWSwcYEvDBORkJ/sMOwhwsQ4pj6yjsMSg+s2DynP6qhuJdD/jkx42YVjFAQ5c= X-Received: by 10.55.190.198 with SMTP id o189mr3812829qkf.103.1504796492022; Thu, 07 Sep 2017 08:01:32 -0700 (PDT) MIME-Version: 1.0 Received: by 10.237.63.166 with HTTP; Thu, 7 Sep 2017 08:01:31 -0700 (PDT) In-Reply-To: References: <1504614995389-0.post@n4.nabble.com> From: Navneeth Krishnan Date: Thu, 7 Sep 2017 08:01:31 -0700 Message-ID: Subject: Re: State Maintenance To: Fabian Hueske Cc: "Tzu-Li (Gordon) Tai" , user Content-Type: multipart/alternative; boundary="94eb2c0442165dc76805589abcd0" archived-at: Thu, 07 Sep 2017 15:01:41 -0000 --94eb2c0442165dc76805589abcd0 Content-Type: text/plain; charset="UTF-8" Will I be able to use both queryable MapState and union list state while implementing the CheckpointedFunction interface? Because one of my major requirement on that operator is to provide a queryable state and in order to compute that state we need the common static state across all parallel operator instances. Thanks. On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske wrote: > Hi Navneeth, > > there's a lower level state interface that should address your > requirements: OperatorStateStore.getUnionListState() > > This union list state is similar to the regular operator list state, but > instead of splitting the list for recovery and giving out splits to > operator instance, it restores the complete list on each operator instance. > So it basically does a broadcast restore. If all operator have the same > state, only one instance checkpoints its state and this state is restored > to all other instances in case of a failure. This should also work with > rescaling. > The operator instance to checkpoint can be identified by (RuntimeContext.getIndexOfThisSubtask > == 0). > > The OperatorStateStore is a bit hidden. You have to implement the > CheckpointedFunction interface. When CheckpointedFunction.initializeState(FunctionInitializationContext > context) is called context has a method getOperatorStateStore(). > > I'd recommend to have a look at the detailed JavaDocs of all involved > classes and methods. > > Hope this helps, > Fabian > > > 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan : > >> Thanks Gordon for your response. I have around 80 parallel flatmap >> operator instances and each instance requires 3 states. Out of which one is >> user state in which each operator will have unique user's data and I need >> this data to be queryable. The other two states are kind of static states >> which are only modified when there an update message in config stream. This >> static data could easily be around 2GB and in my previous approach I used >> operator state where the data is retrieved inside open method across all >> operator instances whereas checkpointed only inside one of the operator >> instance. >> >> One of the issue that I have is if I change the operator parallelism how >> would it affect the internal state? >> >> >> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai >> wrote: >> >>> Hi Navneeth, >>> >>> Answering your three questions separately: >>> >>> 1. Yes. Your MapState will be backed by RocksDB, so when removing an >>> entry >>> from the map state, the state will be removed from the local RocksDB as >>> well. >>> >>> 2. If state classes are not POJOs, they will be serialized by Kryo, >>> unless a >>> custom serializer is specifically specified otherwise. You can take a >>> look >>> at this document on how to do that [1]. >>> >>> 3. I might need to know more information to be able to suggest properly >>> for >>> this one. How are you using the "huge state values"? From what you >>> described, it seems like you only need it on one of the parallel >>> instances, >>> so I'm a bit curious on what they are actually used for. Are they needed >>> when processing your records? >>> >>> Cheers, >>> Gordon >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> dev/stream/state.html#custom-serialization-for-managed-state >>> >>> >>> >>> -- >>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>> ble.com/ >>> >> >> > --94eb2c0442165dc76805589abcd0 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Will I be able to use both queryable MapState and union li= st state while implementing the=C2=A0Check= pointedFunction=C2=A0interface? Because one of my major requirement = on that operator is to provide a queryable state and in order to compute th= at state we need the common static state across all parallel operator insta= nces.

Thanks.

=
On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske <= span dir=3D"ltr"><fhueske@gmail.com> wrote:
=
Hi Navneeth,

there's a lower level state interface that should address your requi= rements: OperatorStateStore.getUnionListState()

Thi= s union list state is similar to the regular operator list state, but inste= ad of splitting the list for recovery and giving out splits to operator ins= tance, it restores the complete list on each operator instance.
So= it basically does a broadcast restore. If all operator have the same state= , only one instance checkpoints its state and this state is restored to all= other instances in case of a failure. This should also work with rescaling= .
The operator instance to checkpoint can be identified by (Runtim= eContext.getIndexOfThisSubtask =3D=3D 0).

The OperatorStateStor= e is a bit hidden. You have to implement the CheckpointedFunction interface= . When CheckpointedFunction.initializeState(FunctionInitializatio= nContext context) is called context has a method getOperatorStateStore().

I'd recommend to have a look at the detaile= d JavaDocs of all involved classes and methods.

Hop= e this helps,
Fabian


2017-09-05 19:35 GMT+02:00 Navneeth Krishnan <reachnavneeth2@gmail.com>:
Thanks Gordon for your response. I have around 80 paral= lel flatmap operator instances and each instance requires 3 states. Out of = which one is user state in which each operator will have unique user's = data and I need this data to be queryable. The other two states are kind of= static states which are only modified when there an update message in conf= ig stream. This static data could easily be around 2GB and in my previous a= pproach I used operator state where the data is retrieved inside open metho= d across all operator instances whereas checkpointed only inside one of the= operator instance.=C2=A0

One of the issue that I have i= s if I change the operator parallelism how would it affect the internal sta= te?


On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Go= rdon) Tai <tzulitai@apache.org> wrote:
Hi Navneeth,

Answering your three questions separately:

1. Yes. Your MapState will be backed by RocksDB, so when removing an entry<= br> from the map state, the state will be removed from the local RocksDB as
well.

2. If state classes are not POJOs, they will be serialized by Kryo, unless = a
custom serializer is specifically specified otherwise. You can take a look<= br> at this document on how to do that [1].

3. I might need to know more information to be able to suggest properly for=
this one. How are you using the "huge state values"? From what yo= u
described, it seems like you only need it on one of the parallel instances,=
so I'm a bit curious on what they are actually used for. Are they neede= d
when processing your records?

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-re= lease-1.3/dev/stream/state.html#custom-serialization-for-managed-= state



--
Sent from: http://apache-flink-u= ser-mailing-list-archive.2336050.n4.nabble.com/



--94eb2c0442165dc76805589abcd0--