flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jack Huang <jackhu...@mz.com>
Subject Periodically evicting internal states when using mapWithState()
Date Tue, 07 Jun 2016 01:04:24 GMT
Hi all,

I have an incoming stream of event objects, each with its session ID. I am
writing a task that aggregate the events by session. The general logics
looks like

case class Event(sessionId:Int, data:String)case class Session(id:Int,
var events:List[Event])
val events = ... //some source
.keyBy((event:Event) => event.sessionId)
.mapWithState((event:Event, state:Option[Session]) => {
    val session = state.getOrElse(Session(id=event.session_id, events=List()))
    session.event = session.event :+ event
    (session, Some(session))

The problem is that there is no reliable way of knowing the end of a
session, since events are likely to get lost. If I keep this process
running, the number of stored sessions will keep growing until it fills up
the disk.

Is there a recommended way of periodically evicting sessions that are too
old (e.g. a day old)?


View raw message