flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Richter <s.rich...@data-artisans.com>
Subject Re: OperatorState partioning when recovering from failure
Date Thu, 04 May 2017 16:25:06 GMT

the repartitoning happens indeed as some round-robin algorithm (see RoundRobinOperatorStateRepartitioner).
This repartitioning happens at the level of the checkpoint coordinator in the master on restore,
by redistrubution of state handles. The state that those handles are pointing to is a black
box in this place, so all assumptions that we can make is that all partitions can be redistributed
freely. If we want additional constraints to the repartitioning, the user has to apply those
when handing over the state partitions, i.e. the partitioning into the list state must happen
in a way that already groups together state partitions that should not end up on separate
machines after a restore.


> Am 04.05.2017 um 17:29 schrieb Kostas Kloudas <k.kloudas@data-artisans.com>:
> Hi Seth,
> Upon restoring, splits will be re-shuffled among the new tasks, and I believe that state
is repartitioned 
> in a round robin way (although I am not 100% sure so I am also including Stefan and Aljoscha
in this).
> The priority queues will be reconstructed based on the restored elements. So task managers
may get
> a relatively equal number of splits, but “recent” ones may be concentrated on a few
nodes. This may 
> also have to do with how your monitor sends them to the reader (e.g. all splits of a
recent file go to the 
> same node).
> As far as I know, we do not have an option for custom state re-partitioner.
> To see what is restored, you can enable DEBUG logging and this will print upon restoring
sth like:
> "ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored {restoredReaderState}"
> with the restoredReaderState containing the restored splits.
> And something similar upon checkpointing. This will give you a better look in what may
be happening.
> Thanks,
> Kostas
>> On May 4, 2017, at 3:45 PM, Seth Wiesman <swiesman@mediamath.com <mailto:swiesman@mediamath.com>>
>> I am curious about how operator state is repartitioned to subtasks when a job is
resumed from a checkpoint or savepoint. The reason is that I am having issues with the ContinuousFileReaderOperator
when recovering from a failure. 
>> I consume most of my data from files off S3. I have a custom file monitor that understands
how to walk my directory structure and outputs TimestampedFileSplits downstream in chronological
order to the stock ContinuousFileReaderOperator. The reader consumes those splits and stores
them a priority queue based on their last modified time ensuring that files are read in chronological
order which is exactly what I want. The problem is when recovering, the unread splits being
partitioned out to each of the subtasks seem to be heavily skewed in terms of last modified
>> While each task may have a similar number of files I find then one or two will have
a disproportionate number of old files. This in turn holds back my watermark (sometimes for
several hours depending on the number of unread splits) which keeps timers from firing, windows
from purging, etc.
>> I was hoping there were some way I could add a custom partitioner to ensure that
splits are uniformly distributed in a temporal manner or if someone had other ideas of how
I could mitigate the problem.
>> Thank you, 
>> Seth Wiesman 

View raw message