flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aris kol <gizera...@hotmail.com>
Subject Re: Accessing state in connected streams
Date Sat, 27 Aug 2016 23:27:25 GMT
Hi Sameer,


Thank you for your quick response.


I don't think event ordering is the problem here, the processor doesn't assume any ordering.

KeyedStream[EventA] stores a state of type Set[InfoA] on its key, which I would like KeyedStream[EventB]
to access.

The code operates on an Option[Set[InfoA]] without inviting trouble by invoking .get.

applyWithState throws the exception because the private ValueState[S] is never initialised.

Since KeyedStream[EventA] successfully updates the state, it can could be that:

- There is some wrong config in SomeRichFlatMapFunctionForEventB, which is fine and can be
fixed

- I am doing something completely wrong that Flink doesn't support.


Thanks,

Aris


________________________________
From: Sameer W <sameer@axiomine.com>
Sent: Saturday, August 27, 2016 10:17 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

There is no guarantee about the order in which each stream elements arrive in a connected
streams. You have to check if the elements have arrived from Stream A before using the information
to process elements from Stream B. Otherwise you have to buffer elements from stream B and
check if there are unprocessed elements from stream B when elements arrive from stream A.
You might need to do that for elements from both streams depending on how you use them.

You will get  NPE if you assume events have arrived from A and but they might be lagging behind.

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gizerakos@hotmail.com<mailto:gizerakos@hotmail.com>>
wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in the State
of typeAStream.

One approach would be to use the same stream for the two topics and then pattern match, but
Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, G[EventA]]
{ ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not initialized
(meaning it was not there).


Thanks,
Aris



Mime
View raw message