flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anchit Jatana <development.anc...@gmail.com>
Subject RE: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity
Date Sat, 22 Oct 2016 04:58:10 GMT
Hi Bart,

Thank you so much for sharing the approach. Looks like this solved my
problem. Here's what I have as an implementation for my use-case:

package org.apache.flink.quickstart

import org.apache.flink.api.common.state.{ ReducingState,
ReducingStateDescriptor, ValueState, ValueStateDescriptor }
import
org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction
import org.apache.flink.streaming.api.windowing.time.Time
import
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{ Trigger,
TriggerResult }
import org.apache.flink.streaming.api.windowing.windows.Window
import org.slf4j.LoggerFactory

class sessionTrigger[E](val sessionPauseHours: Long) extends Trigger[E,
Window] {

  val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer",
classOf[Option[Long]], None)

  override def onElement(t: E, l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {

    // remove old timer
    val time_state: ValueState[Option[Long]] =
triggerContext.getPartitionedState(timeState)
    val time_set = time_state.value()
    if (time_set.isDefined) {
      triggerContext.deleteProcessingTimeTimer(time_set.get)
    }
    // set new time and continue
    val new_time = triggerContext.getCurrentProcessingTime +
Time.seconds(sessionPauseHours).toMilliseconds()
    time_state.update(Some(new_time))
    triggerContext.registerProcessingTimeTimer(new_time)
    TriggerResult.FIRE
  }

  override def onProcessingTime(l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {
    TriggerResult.PURGE
  }

  override def onEventTime(l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}

Regards,
Anchit



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9676.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message