flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: OperatorState partioning when recovering from failure
Date Thu, 04 May 2017 15:29:28 GMT
Hi Seth,

Upon restoring, splits will be re-shuffled among the new tasks, and I believe that state is
repartitioned 
in a round robin way (although I am not 100% sure so I am also including Stefan and Aljoscha
in this).
The priority queues will be reconstructed based on the restored elements. So task managers
may get
a relatively equal number of splits, but “recent” ones may be concentrated on a few nodes.
This may 
also have to do with how your monitor sends them to the reader (e.g. all splits of a recent
file go to the 
same node).

As far as I know, we do not have an option for custom state re-partitioner.

To see what is restored, you can enable DEBUG logging and this will print upon restoring sth
like:

"ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored {restoredReaderState}"

with the restoredReaderState containing the restored splits.

And something similar upon checkpointing. This will give you a better look in what may be
happening.

Thanks,
Kostas

> On May 4, 2017, at 3:45 PM, Seth Wiesman <swiesman@mediamath.com> wrote:
> 
> I am curious about how operator state is repartitioned to subtasks when a job is resumed
from a checkpoint or savepoint. The reason is that I am having issues with the ContinuousFileReaderOperator
when recovering from a failure. 
>  
> I consume most of my data from files off S3. I have a custom file monitor that understands
how to walk my directory structure and outputs TimestampedFileSplits downstream in chronological
order to the stock ContinuousFileReaderOperator. The reader consumes those splits and stores
them a priority queue based on their last modified time ensuring that files are read in chronological
order which is exactly what I want. The problem is when recovering, the unread splits being
partitioned out to each of the subtasks seem to be heavily skewed in terms of last modified
time.
>  
> While each task may have a similar number of files I find then one or two will have a
disproportionate number of old files. This in turn holds back my watermark (sometimes for
several hours depending on the number of unread splits) which keeps timers from firing, windows
from purging, etc.
>  
> I was hoping there were some way I could add a custom partitioner to ensure that splits
are uniformly distributed in a temporal manner or if someone had other ideas of how I could
mitigate the problem.
>  
> Thank you, 
>  
> Seth Wiesman 
>  


Mime
View raw message