flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aris kol <gizera...@hotmail.com>
Subject Re: Accessing state in connected streams
Date Sun, 28 Aug 2016 00:04:08 GMT
In the implementation I am passing just one CoFlatMapFunction, where flatMap1, which operates
on EventA, just emits a None (doesn't do anything practically) and flatMap2 tries to access
the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to flatten afterwards
before pushing dowstream.


Aris


________________________________
From: Sameer W <sameer@axiomine.com>
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. Just curious
though how are you passing two MapFunction's to the flatMap function on the connected stream.
The interface of ConnectedStream requires just one CoMap function- https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gizerakos@hotmail.com<mailto:gizerakos@hotmail.com>>
wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in the State
of typeAStream.

One approach would be to use the same stream for the two topics and then pattern match, but
Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, G[EventA]]
{ ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not initialized
(meaning it was not there).


Thanks,
Aris



Mime
View raw message