flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Apache Flink and serious streaming stateful processing
Date Fri, 16 Oct 2015 14:20:51 GMT

As Gyula mentioned an upcoming Pull Request will make the state backend
pluggable. We would like to add the following state holders into Flink:

(1) Small state in memory (local execution / debugging) : State maintained
in a heap hash map, checkpoints to JobManager. This is in there now.

(2) Large state in memory - State maintained as a heap hash map,
checkpoints to distributed file system. This is in there now as well.

(3) Large state in-memory/out of core, (incremental) checkpoints to file

(4) State maintained externally, hot keys cached in Flink. This is what
Gyula is working on right now.

I think point (3) is what you are looking for.

There are actually two different variants we could attempt that (3.1 and
3.2), where (3.1) would have the state in an out-of-core index in Flink and
(3.2) would use an embedded LevelDB or so to store the key/value pairs.

What we have by now is the pluggable back end. I would like to start with
(3) after the 0.10 release is out, which should be very soon, we have begun
the fixing and testing phase.
How fast a prototype of this state backend is available depends a bit on
how well MapDB, LevelDB or any of those can be embedded for our case. I
hope it is pretty soon :-)


On Thu, Oct 15, 2015 at 12:55 PM, Gyula Fóra <gyula.fora@gmail.com> wrote:

> Hi!
> You have an interesting use case that I think comes up in many
> applications (in fact I will be working on something very similar shortly).
> Stephan has made some nice changes (this
> <https://github.com/apache/flink/pull/1239> PR) to the State interfaces
> supporting flexible backends, which can be used to implement this
> functionality. While the new state backends support more efficient
> checkpoints for KV states it still does not support incremental snapshots
> and keeping out-of-core states.
> My current plan is to implement a caching layer on top of the storage
> layer (Cassandra in your case) which will be used to only keep the "hot"
> keys in the Flink streaming operators and would evict the cold keys to the
> external storage on checkpoints.
> Some things we need to worry about this case is that we should only
> overwrite elements in our storage when we know that a checkpoint is
> complete. (we can use checkpointnotifications for this) This assumes some
> sort of versioning in the storage layer.
> I think this will definitely not make it to 0.10, but I am confident that
> we will have something working for the next (1.0) release. I suggest you
> try experimenting with the logic, maybe you can get something working much
> quicker :)
> Cheers,
> Gyula
> Krzysztof Zarzycki <k.zarzycki@gmail.com> ezt írta (időpont: 2015. okt.
> 14., Sze, 11:02):
>> Hi guys!
>> I'm sorry I have abandoned this thread but I had to give up Flink for
>> some time. Now I'm back and would like to resurrect this thread. Flink has
>> rapidly evolved in this time too, so maybe new features will allow me what
>> I want to do. By the way, I heard really only good stuff about you from
>> Flink Forward conference!
>> First, about back-pressure. As you said, it is working well so I'm taking
>> it as granted. Sounds great!
>> Let's focus now on stateful processing:
>> To back up what I mean, I'm citing some numbers of the state I'm
>> currently holding:
>> My stream processing program keeps around 300GB in 1 month state, but it
>> will be holding around 2 months, so twice as much (600 GB). The state is
>> key-value store, where key is some user id & value is actually a list of
>> events correlated with the user. There are tens of millions of keys -
>> unique user ids. The stream is partitioned on user id, so my state can be
>> partitioned on user id as well.
>> Currently I keep this "state" in Cassandra, so externally to the program,
>> but this is my biggest pain as the communication cost is large, especially
>> when I do reprocessing of my streaming data.
>> Now what I would like to have is some abstraction available in Flink,
>> that allows me to keep the state out-of-core, but embedded. I would use it
>> as key-value store and Flink will journal & replicate all the update
>> operations, so they are recoverable on failure, when the state (or its
>> partition) is lost.
>> To describe my idea in code, I imagine the following pseudocode (totally
>> abstracted from Flink):
>> class MyProcessor {
>>   val keyValueState = Flink.createKeyValueState("name-it")
>>   def processRecord(r: Record) {
>>      val userList = keyValueState.get(r.get("userId"))
>>      userList += r.get("someData")
>>      keyValueState.put(r.get("userId"), userList)
>>   }
>> }
>> Something similar is in Samza, with grants:
>> - all puts are replicated (by saving the put in separate Kafka topic).
>> - on failure & recover, the state is recovered from the saved puts,
>> before starting the processing.
>> Last time, you said that you're "working on making incrementally
>> backed-up key/value state a first-class citizen in Flink, but is is
>> still WIP".  How this change since then? Do you already support the case
>> that I just described?
>> Thanks for the idea of MapDB. I couldn't  find any benchmark of MapDB
>> out-of-core performance , and I don't know yet if it can match performance
>> of RocksDB-like database, but I will try to find time to check it.
>> In meantime, this is the performance that attracts me to RocksDb:
>>> Measure performance to load 1B keys into the database. The keys are
>>> inserted in random order.
>>>  rocksdb: 103 minutes, 80 MB/sec (total data size 481 GB, 1 billion
>>> key-values)
>>> Measure performance to load 1B keys into the database. The keys are
>>> inserted in sequential order.
>>> rocksdb: 36 minutes, 370 MB/sec (total data size 760 GB)
>> [1] https://github.com/facebook/rocksdb/wiki/Performance-Benchmarks
>> Cheers!
>> Krzysiek
>> 2015-06-30 15:00 GMT+02:00 Ufuk Celebi <uce@apache.org>:
>>> On 30 Jun 2015, at 14:23, Gyula Fóra <gyula.fora@gmail.com> wrote:
>>> > 2. We have support for stateful processing in Flink in many ways you
>>> have described in your question. Unfortunately the docs are down currently
>>> but you should check out the 'Stateful processing' section in the 0.10 docs
>>> (once its back online). We practically support an OperatorState interface
>>> which let's you keep partitioned state by some key and access it from
>>> runtime operators. The states declared using these interfaces are
>>> checkpointed and will be restored on failure. Currently all the states are
>>> stored in-memory but we are planning to extend it to allow writing state
>>> updates to external systems.
>>> http://flink.apache.org/docs/master/apis/streaming_guide.html#stateful-computation

View raw message