flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
Subject Re: About stateful transformations
Date Thu, 27 Oct 2016 15:45:06 GMT
Hi Aljoscha,

Thanks for your answer. At least by keeping only the latest one we don't
have retention problems with the state backend, and for now I guess we
could use manually triggered savepoints if we needed to store the history
of the state.



On Tue, Oct 25, 2016 at 6:58 AM, Aljoscha Krettek <aljoscha@apache.org>

> Hi,
> there is already a mechanism for that. Currently, Flink will only keep the
> most recent, successful checkpoint. We are currently working on making that
> configurable so that, for example, the last n successful checkpoints can be
> kept.
> Cheers,
> Aljoscha
> On Tue, 25 Oct 2016 at 06:47 Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>> Hi Gyula,
>> Thanks a lot for your response, it was very clear. I understand that
>> there is no problem of small files due to checkpointing not being
>> incremental. I also understand that each worker will interpret a file://
>> URL as local to its own file system, which works ok if all workers have a
>> remove file system mounted in the same local path.
>> Now I have to check if Flink provides some expiration mechanism for old
>> checkpoints, although for S3 that is already available, and for HDFS I
>> guess some script that periodically deletes older files with hdfs dfs
>> -rmr should be easy enough. Is there any documentation about some naming
>> convention for checkpoint files that I could rely to delete old
>> checkpoints? E.g. some naming schema that uses dates, it would be nicer if
>> it was documented because then it would be more stable.
>> Thanks again for your help.
>> Greetings,
>> Juan
>> On Mon, Oct 24, 2016 at 12:29 AM, Gyula Fóra <gyfora@apache.org> wrote:
>> Hi Juan,
>> Let me try to answer some of your questions :)
>> We have been running Flink Streaming at King for quite some time now with
>> multiple jobs having several hundred gigabytes of KV state stored in
>> RocksDB. I would say RocksDB state backend is definitely the best choice at
>> the moment for large deployments as you can also keep the heap relatively
>> small to save some time on GC. But you have to play around with the rocks
>> configuration to get the most out of it depending on your hardware.
>> I am not aware of any caching/TTL functionality exposed in the Flink APIs
>> currently. But if you are willing to dig a llittle deeper you could
>> implement a lower lever operator that uses timers like the windowing
>> mechanisms to clear state after some time.
>> When you are selecting a checkpoint directory (URL) you need to make sure
>> that it is accessible from all the task managers. HDFS is convenient but is
>> not strictly necessary. We for instance use CEPH that is mounted as a
>> regular disk from the OS's perspective so we can use file:// and still save
>> to the distributed storage. As far as I know using yarn doesnt give much
>> benefit, I am not sure if Flink exploits any data locality at this moment.
>> When you are running rocks db state backend there are two concepts you
>> need to think about for checkpointing. Your local rocks db directory, and
>> the checkpoint directory. The local directory is where the rocks instances
>> are created and they live on the taskmanagers local disk/memory. When Flink
>> takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS
>> or to the selected checkpoint directory. This means there is no data
>> fragmentation in the checkpoints. Similar applies to the FsStateBackend but
>> that keeps the local state strictly in memory.
>> I think you should definitely give RocksDB + HDFS a try. It works
>> extremely well for very large state sizes given some tuning, but should
>> also perform out-of-the-box :)
>> Cheers,
>> Gyula
>> Juan Rodríguez Hortalá <juan.rodriguez.hortala@gmail.com> ezt írta
>> (időpont: 2016. okt. 23., V, 22:29):
>> Hi all,
>> I don't have much experience with Flink, so please forget me if I ask
>> some obvious questions. I was taking a look to the documentation
>> on stateful transformations in Flink at https://ci.apache.org/
>> projects/flink/flink-docs-release-1.2/dev/state.html. I'm mostly
>> interested in Flink for stream processing, and I would like to know:
>> - What is the biggest state that has been used in production deployments?
>> I'm interested in how many GB of state, among all key-value pairs, have
>> been used before in long running streaming jobs deployed in production.
>> Maybe some company has shared this in some conference or blog post. I guess
>> for that RocksDB backend is the best option for big states, to avoid being
>> limited by the available memory.
>> - Is there any pre built functionality for state eviction? I'm thinking
>> of LRU cache-like behavior, with eviction based on time or size, similar to
>> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This
>> is probably easy to implement anyway, by using the clear() primitive, but I
>> wouldn't like to reinvent the wheel if this is already implemented
>> somewhere.
>> - When using file:// for the checkpointing URL, is the data replicated in
>> the workers, or a failure in a worker leads to losing the state stored in
>> that worker? I guess with hdfs:// we get the replication of HDFS, and we
>> don't have that problem. I also guess S3 can be used for checkpointing
>> state too, is there any remarkable performance impact of using s3 instead
>> of HDFS for checkpointing? I guess some performance is lost compared to a
>> setup running in YARN with collocated DataNodes and NodeManagers, but I
>> wanted to know if the impact is negible, as checkpointing is performed at a
>> relatively slow frequency. Also I'm interested on Flink running on EMR,
>> where the impact of this should be even smaller because the access to S3 is
>> faster from EMR than from an in-house YARN cluster outside the AWS cloud.
>> - Is there any problem with the RocksDB backend on top of HDFS related to
>> defragmentation? How is clear handled for long running jobs? I'm thinking
>> on a streaming job that has a state with a size of several hundred GBs,
>> where each key-pair is stored for a week and then deleted. How does clear()
>> work, and how do you deal with the "small files problem" of HDFS (
>> http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for
>> the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a
>> problem for S3, as it is an object store that has no problem with small
>> files.
>> Thanks a lot for your help!
>> Greetings,
>> Juan Rodriguez Hortala

View raw message