Hi guys,
I want to sessionize this stream: 1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5, ... to these sessions:

1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5

I've wrote CustomTrigger to detect when stream elements change from 1 to 2 (2 to 3, 3 to 0 and so on) and then fire the trigger. But this is not the solution, because when I processing the first element of 2's, and fire the trigger the window will be [1,1,1,2] but I need to fire the trigger on the last element of 1's. 


Here is the pesudo of my onElement function in my custom trigger class:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    if (prevState == element.value) {
      prevState = element.value
      TriggerResult.CONTINUE
    } else {
      prevState = element.value
      TriggerResult.FIRE
    }
}

How can I solve this problem?

--
Milād Khājavi
http://blog.khajavi.ir
Having the source means you can do it yourself.
I tried to change the world, but I couldn’t find the source code.