flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Global State and Scaling
Date Tue, 22 Aug 2017 07:47:17 GMT
Hi Elias,

you're right, we currently don't support proper broadcast state. Hope to
add support for this in the near future.

The maximum parallelism only affects the keyed state because it defines how
many key groups there are. The key groups are the smallest unit of state
which can be re-partitioned (e.g. due to scaling up/down).


On Tue, Aug 22, 2017 at 3:02 AM, Elias Levy <fearsome.lucidity@gmail.com>

> Looks like Gerard asked something along similar lines
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-td14102.html>
> just last month and that there is a JIRA
> <https://issues.apache.org/jira/browse/FLINK-4940> for official support
> for broadcast state.  Looks like the ugly hack is the way to go for now.
> On Mon, Aug 21, 2017 at 1:23 PM, Elias Levy <fearsome.lucidity@gmail.com>
> wrote:
>> I am implementing a control stream.  The stream communicates a global
>> configuration value for the whole job.  It uses DataStream.broadcast() to
>> communicate this to all parallel operator instances.  I would like to save
>> this value in state so that it can be recovered when the job
>> restarts/recovers.  The control stream is not keyed, so the only option is
>> Operator state.
>> I could implement this using the ListCheckpointed interface, returning
>> Collections.singletonList(configValue) from snapshotState.  It is clear
>> what I'd need to do in restoreState in the case of scale in.  If I include
>> a serial number in the config, and it receives multiple values on restore,
>> it can keep the config value with the largest serial number, indicating the
>> latest config.
>> Alas, it is not clear what should happen on scale out, as some operator
>> instances will receive empty lists.
>> It seems the other alternative is to use CheckpointedFunction, along with
>> union redistribution via getUnionListState, and then have each operator
>> instance select from the union list the config with the latest serial
>> number, of which they should be multiple copies.  But this seem like an
>> ugly hack.
>> In addition, the documentation is unclear on the relationship and effect,
>> if any, of the maximum parallelism Flink job parameter on operator state,
>> where as it is much clearer on this regard as it related to keyed state via
>> key groups.
>> How are folks handling this use case, i.e. storing and restoring global
>> config values via Flink state?

View raw message