flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: Streaming KV store abstraction
Date Wed, 23 Mar 2016 14:38:13 GMT
Hi!

Sorry for the late answer, I completely missed this email. (Thanks Robert
for pointing out).

You won't be able to use that project as it was dependent on an earlier
snapshot version that still had completely different state semantics.
I don't think it is realistic that I will re-implment this any time soon,
but I think you can easily do what you want in the following way:

Let's say you have 2 streams, the first contains the enrichment data per
key let's say enrichments = DataStream<Tuple2<key, state>> .
The second stream is the event stream that you want to enrich: events =
DataStream<Tuple2<key, event>>

To apply the enrichments the easiest is to use a CoFlatMap with a
partitioned value state inside:

events.connect(enrichments).keyBy(0,0).flatMap(new YourCoFlatMap())

In this case if you declare a value state inside YourCoFlatMap it will be
kept per key. For example in the open method:
state = getRuntimeContext().getState(new ValueStateDescriptor("stateName",
type, defaultValue)).

Now that you have everything set up, in flatMap1 (for events) you would
query the state : state.value() and enrich your data
in flatMap2 you would update the state: state.update(newState)

Does this make sense to you? Or is the use case completely different?

Cheers,
Gyula

Nam-Luc Tran <namluc.tran@euranova.eu> ezt írta (időpont: 2016. márc. 18.,
P, 18:25):

> Hi Gyula,
>
> I'm currently looking after ways to enrich streams with external data. Have
> you got any update on the topic in general or on StreamKV?
>
> I've checked out the code but it won't build, mainly because
> StateCheckpointer has been removed since [FLINK-2808]. Any hint on a quick
> replacement, before I dive in deeper?
>
> Cheers,
>
> 2015-09-15 20:29 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>
> > I think that is actually a cool way to kick of an addition to the system.
> > Gives you a lot of flexibility and releasing and testing...
> >
> > It helps, though, to upload maven artifacts for it!
> >
> > On Tue, Sep 15, 2015 at 7:18 PM, Gyula Fóra <gyfora@apache.org> wrote:
> >
> > > Hey All,
> > >
> > > We decided to make this a standalone library until it is stable enough
> > and
> > > then we can decide whether we want to keep it like that or include in
> the
> > > project:
> > >
> > > https://github.com/gyfora/StreamKV
> > >
> > > Cheers,
> > > Gyula
> > >
> > > Gianmarco De Francisci Morales <gdfm@apache.org> ezt írta (időpont:
> > 2015.
> > > szept. 9., Sze, 20:25):
> > >
> > > > Yes, pretty clear. I guess semantically it's still a co-group, but
> > > > implemented slightly differently.
> > > >
> > > > Thanks!
> > > >
> > > > --
> > > > Gianmarco
> > > >
> > > > On 9 September 2015 at 15:37, Gyula Fóra <gyula.fora@gmail.com>
> wrote:
> > > >
> > > > > Hey Gianmarco,
> > > > >
> > > > > So the implementation looks something different:
> > > > >
> > > > > The update stream is received by a stateful KVStoreOperator which
> > > stores
> > > > > the K-V pairs as their partitioned state.
> > > > >
> > > > > The query for the 2 cities is assigned an ID yes, and is split to
> > the 2
> > > > > cities, and each of these are  sent to the same KVStoreOperator as
> > the
> > > > > update stream. The output is the value for each key practically
> (qid,
> > > > > city1, temp1) which is retreived from the operator state , and this
> > > > output
> > > > > is merged in a next operator to form the KV[] output on which the
> > user
> > > > can
> > > > > execute the difference if he wants.
> > > > >
> > > > > So actually no co-group is happening although semantically it might
> > be
> > > > > similar. Instead I use stateful operators to be much more
> efficient.
> > > > >
> > > > > Does this answer you question?
> > > > >
> > > > > Gyula
> > > > >
> > > > > Gianmarco De Francisci Morales <gdfm@apache.org> ezt írta
> (időpont:
> > > > 2015.
> > > > > szept. 9., Sze, 14:29):
> > > > >
> > > > > > Just a silly question.
> > > > > > For the example you described, in a data flow model, you would
do
> > > > > something
> > > > > > like this:
> > > > > >
> > > > > > Have query ids added to the city pairs (qid, city1, city2),
> > > > > > then split the query stream on the two cities and co-group it
> with
> > > the
> > > > > > updates stream ((city1, qid) , (city, temp)), same for city2,
> > > > > > then emit (qid, city1, temp1), (qid, city2, temp2) from the
two
> > > > > co-groups,
> > > > > > group on the qid, and apply a difference operator to get the
> final
> > > > > answer.
> > > > > >
> > > > > > Is your  idea to implement a way to generalize this logic, or
it
> > > would
> > > > > use
> > > > > > remote read/write to a KV-store?
> > > > > >
> > > > > > --
> > > > > > Gianmarco
> > > > > >
> > > > > > On 8 September 2015 at 16:27, Aljoscha Krettek <
> > aljoscha@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > That's a very nice application of the Stream API and
> partitioned
> > > > state.
> > > > > > :D
> > > > > > >
> > > > > > > I think we should run some tests on a cluster  based on
this to
> > see
> > > > > what
> > > > > > > kind of throughput the partitioned state system can handle
and
> > also
> > > > how
> > > > > > it
> > > > > > > behaves with larger numbers of keys. The KVStore is just
an
> > > interface
> > > > > and
> > > > > > > the really heavy lifting is done by the state system so
this
> > would
> > > > be a
> > > > > > > good test for it.
> > > > > > >
> > > > > > >
> > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <gyula.fora@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > @Stephan:
> > > > > > > >
> > > > > > > > Technically speaking this is really just a partitioned
> > key-value
> > > > > state
> > > > > > > and
> > > > > > > > a fancy operator executing special operations on this
state.
> > > > > > > >
> > > > > > > > From the user's perspective though this is something
hard to
> > > > > implement.
> > > > > > > If
> > > > > > > > you want to share state between two stream for instance
this
> > way
> > > > > > (getting
> > > > > > > > updates from one stream and enriching the other one)
you
> would
> > > > > probably
> > > > > > > use
> > > > > > > > a connected datastream and custom implement the Key-value
> store
> > > > > logic.
> > > > > > > But
> > > > > > > > once you have one(or more) update stream and many
get streams
> > > this
> > > > > > > > implementation will not work. So either the user end
up
> > > replicating
> > > > > the
> > > > > > > > whole state in multiple connected operators, or custom
> > implement
> > > > some
> > > > > > > > inefficient wrapper class to take care of all the
put/get
> > > > operations.
> > > > > > > >
> > > > > > > > The Idea behind this is to give a very simple abstraction
for
> > > this
> > > > > type
> > > > > > > of
> > > > > > > > processing that uses the flink runtime efficiently
instead of
> > > > relying
> > > > > > on
> > > > > > > > custom implementations.
> > > > > > > >
> > > > > > > > Let me give you a stupid example:
> > > > > > > >
> > > > > > > > You receive Temperature data in the form of (city,
> > temperature),
> > > > and
> > > > > > you
> > > > > > > > are computing a rolling avg for each city.
> > > > > > > > Now you have 2 other incoming streams: first is a
stream of
> > some
> > > > > other
> > > > > > > info
> > > > > > > > about the city let's say population (city, population)
and
> you
> > > want
> > > > > to
> > > > > > > > combine it with the last known avg temperature to
produce
> > (city,
> > > > > temp,
> > > > > > > pop)
> > > > > > > > triplets. The second stream is a pair of cities (city,city)
> and
> > > you
> > > > > are
> > > > > > > > interested in the difference of the temperature.
> > > > > > > >
> > > > > > > > For enriching the (city, pop) to (city,temp,pop) you
would
> > > probably
> > > > > > use a
> > > > > > > > CoFlatMap and store the last known rolling avg as
state. For
> > > > > computing
> > > > > > > the
> > > > > > > > (city,city) temperature difference it is a little
more
> > difficult,
> > > > as
> > > > > > you
> > > > > > > > need to get the temperature for both cities then combine
in a
> > > > second
> > > > > > > > operator. If you don't want to replicate your state,
you have
> > to
> > > > > > combine
> > > > > > > > these two problems to a common wrapper type and execute
them
> > on a
> > > > > same
> > > > > > > > operator which will keep the avg state.
> > > > > > > >
> > > > > > > > With the KVStore abstraction this is very simple:
> > > > > > > > you create a KVStore<City, Temp>
> > > > > > > > For enriching you use kvStore.getWithKeySelector()
which will
> > > give
> > > > > you
> > > > > > > > ((cit,pop), temp) pairs and you are done. For computing
the
> > > > > difference,
> > > > > > > you
> > > > > > > > can use kvStore.multiget(...) and get for the 2 cities
at the
> > > same
> > > > > > type.
> > > > > > > > The kv store will abstract away the getting of the
2 keys
> > > > separately
> > > > > > and
> > > > > > > > merging them so it will return [(city1, t1), (city2,t2)].
> > > > > > > >
> > > > > > > > This might be slightly artificial example but I think
it
> makes
> > > the
> > > > > > point.
> > > > > > > > Implementing these jobs efficiently is not trivial
for the
> > users
> > > > but
> > > > > I
> > > > > > > > think it is a very common problem.
> > > > > > > >
> > > > > > > > Stephan Ewen <sewen@apache.org> ezt írta (időpont:
2015.
> > szept.
> > > > 8.,
> > > > > K,
> > > > > > > > 14:53):
> > > > > > > >
> > > > > > > > > @Gyula
> > > > > > > > >
> > > > > > > > > Can you explain a bit what this KeyValue store
would do
> more
> > > then
> > > > > the
> > > > > > > > > partitioned key/value state?
> > > > > > > > >
> > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay
<
> > ggab90@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello,
> > > > > > > > > >
> > > > > > > > > > As for use cases, in my old job at Ericsson
we were
> > building
> > > a
> > > > > > > > > > streaming system that was processing data
from telephone
> > > > > networks,
> > > > > > > and
> > > > > > > > > > it was using key-value stores a LOT. For
example, keeping
> > > track
> > > > > of
> > > > > > > > > > various state info of the users (which cell
are they
> > > currently
> > > > > > > > > > connected to, what bearers do they have,
...); mapping
> from
> > > IDs
> > > > > of
> > > > > > > > > > users in one subsystem of the telephone
network to the
> IDs
> > of
> > > > the
> > > > > > > same
> > > > > > > > > > users in an other subsystem; mapping from
IDs of phone
> > calls
> > > to
> > > > > > lists
> > > > > > > > > > of IDs of participating users; etc.
> > > > > > > > > > So I imagine they would like this a lot.
(At least, if
> they
> > > > were
> > > > > > > > > > considering moving to Flink :))
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Gabor
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <gyfora@apache.org
> >:
> > > > > > > > > > > Hey All,
> > > > > > > > > > >
> > > > > > > > > > > The last couple of days I have been
playing around with
> > the
> > > > > idea
> > > > > > of
> > > > > > > > > > > building a streaming key-value store
abstraction using
> > > > stateful
> > > > > > > > > streaming
> > > > > > > > > > > operators that can be used within Flink
Streaming
> > programs
> > > > > > > > seamlessly.
> > > > > > > > > > >
> > > > > > > > > > > Operations executed on this KV store
would be fault
> > > tolerant
> > > > as
> > > > > > it
> > > > > > > > > > > integrates with the checkpointing mechanism,
and if we
> > add
> > > > > > > timestamps
> > > > > > > > > to
> > > > > > > > > > > each put/get/... operation we can use
the watermarks to
> > > > create
> > > > > > > fully
> > > > > > > > > > > deterministic results. This functionality
is very
> useful
> > > for
> > > > > many
> > > > > > > > > > > applications, and is very hard to implement
properly
> with
> > > > some
> > > > > > > > > dedicates
> > > > > > > > > > kv
> > > > > > > > > > > store.
> > > > > > > > > > >
> > > > > > > > > > > The KVStore abstraction could look
as follows:
> > > > > > > > > > >
> > > > > > > > > > > KVStore<K,V> store = new KVStore<>;
> > > > > > > > > > >
> > > > > > > > > > > Operations:
> > > > > > > > > > >
> > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>)
> > > > > > > > > > > store.get(DataStream<K>) ->
DataStream<KV<K,V>>
> > > > > > > > > > > store.remove(DataStream<K>) ->
DataStream<KV<K,V>>
> > > > > > > > > > > store.multiGet(DataStream<K[]>)
->
> DataStream<KV<K,V>[]>
> > > > > > > > > > > store.getWithKeySelector(DataStream<X>,
> KeySelector<X,K>)
> > > ->
> > > > > > > > > > > DataStream<KV<X,V>[]>
> > > > > > > > > > >
> > > > > > > > > > > For the resulting streams I used a
special KV
> abstraction
> > > > which
> > > > > > > let's
> > > > > > > > > us
> > > > > > > > > > > return null values.
> > > > > > > > > > >
> > > > > > > > > > > The implementation uses a simple streaming
operator for
> > > > > executing
> > > > > > > > most
> > > > > > > > > of
> > > > > > > > > > > the operations (for multi get there
is an additional
> > merge
> > > > > > > operator)
> > > > > > > > > with
> > > > > > > > > > > either local or partitioned states
for storing the
> > > kev-value
> > > > > > pairs
> > > > > > > > (my
> > > > > > > > > > > current prototype uses local states).
And it can either
> > > > execute
> > > > > > > > > > operations
> > > > > > > > > > > eagerly (which would not provide deterministic
> results),
> > or
> > > > > > buffer
> > > > > > > up
> > > > > > > > > > > operations and execute them in order
upon watermarks.
> > > > > > > > > > >
> > > > > > > > > > > As for use cases you can probably come
up with many I
> > will
> > > > save
> > > > > > > that
> > > > > > > > > for
> > > > > > > > > > > now :D
> > > > > > > > > > >
> > > > > > > > > > > I have a prototype implementation here
that can execute
> > the
> > > > > > > > operations
> > > > > > > > > > > described above (does not handle watermarks
and time
> > yet):
> > > > > > > > > > >
> > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore
> > > > > > > > > > > And also an example job:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java
> > > > > > > > > > >
> > > > > > > > > > > What do you think?
> > > > > > > > > > > If you like it I will work on writing
tests and it
> still
> > > > needs
> > > > > a
> > > > > > > lot
> > > > > > > > of
> > > > > > > > > > > tweaking and refactoring. This might
be something we
> want
> > > to
> > > > > > > include
> > > > > > > > > with
> > > > > > > > > > > the standard streaming libraries at
one point.
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > > Gyula
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
>
> *Nam-Luc TRAN*
>
> R&D Manager
>
> EURA NOVA
>
> (M) +32 498 37 36 23
>
> *euranova.eu <http://euranova.eu>*
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message