flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Krzysztof Zarzycki <k.zarzy...@gmail.com>
Subject Re: Apache Flink and serious streaming stateful processing
Date Wed, 14 Oct 2015 09:01:36 GMT
Hi guys!
I'm sorry I have abandoned this thread but I had to give up Flink for some
time. Now I'm back and would like to resurrect this thread. Flink has
rapidly evolved in this time too, so maybe new features will allow me what
I want to do. By the way, I heard really only good stuff about you from
Flink Forward conference!

First, about back-pressure. As you said, it is working well so I'm taking
it as granted. Sounds great!

Let's focus now on stateful processing:

To back up what I mean, I'm citing some numbers of the state I'm currently
holding:
My stream processing program keeps around 300GB in 1 month state, but it
will be holding around 2 months, so twice as much (600 GB). The state is
key-value store, where key is some user id & value is actually a list of
events correlated with the user. There are tens of millions of keys -
unique user ids. The stream is partitioned on user id, so my state can be
partitioned on user id as well.
Currently I keep this "state" in Cassandra, so externally to the program,
but this is my biggest pain as the communication cost is large, especially
when I do reprocessing of my streaming data.

Now what I would like to have is some abstraction available in Flink, that
allows me to keep the state out-of-core, but embedded. I would use it as
key-value store and Flink will journal & replicate all the update
operations, so they are recoverable on failure, when the state (or its
partition) is lost.
To describe my idea in code, I imagine the following pseudocode (totally
abstracted from Flink):
class MyProcessor {
  val keyValueState = Flink.createKeyValueState("name-it")

  def processRecord(r: Record) {
     val userList = keyValueState.get(r.get("userId"))
     userList += r.get("someData")
     keyValueState.put(r.get("userId"), userList)
  }
}

Something similar is in Samza, with grants:
- all puts are replicated (by saving the put in separate Kafka topic).
- on failure & recover, the state is recovered from the saved puts, before
starting the processing.


Last time, you said that you're "working on making incrementally backed-up
key/value state a first-class citizen in Flink, but is is still WIP".  How
this change since then? Do you already support the case that I just
described?


Thanks for the idea of MapDB. I couldn't  find any benchmark of MapDB
out-of-core performance , and I don't know yet if it can match performance
of RocksDB-like database, but I will try to find time to check it.
In meantime, this is the performance that attracts me to RocksDb:

> Measure performance to load 1B keys into the database. The keys are
> inserted in random order.
>  rocksdb: 103 minutes, 80 MB/sec (total data size 481 GB, 1 billion
> key-values)
> Measure performance to load 1B keys into the database. The keys are
> inserted in sequential order.
> rocksdb: 36 minutes, 370 MB/sec (total data size 760 GB)


[1] https://github.com/facebook/rocksdb/wiki/Performance-Benchmarks

Cheers!
Krzysiek

2015-06-30 15:00 GMT+02:00 Ufuk Celebi <uce@apache.org>:

>
> On 30 Jun 2015, at 14:23, Gyula Fóra <gyula.fora@gmail.com> wrote:
> > 2. We have support for stateful processing in Flink in many ways you
> have described in your question. Unfortunately the docs are down currently
> but you should check out the 'Stateful processing' section in the 0.10 docs
> (once its back online). We practically support an OperatorState interface
> which let's you keep partitioned state by some key and access it from
> runtime operators. The states declared using these interfaces are
> checkpointed and will be restored on failure. Currently all the states are
> stored in-memory but we are planning to extend it to allow writing state
> updates to external systems.
>
>
> http://flink.apache.org/docs/master/apis/streaming_guide.html#stateful-computation

Mime
View raw message