flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shikhar <shik...@schmizz.net>
Subject Periodic actions
Date Fri, 04 Mar 2016 02:08:43 GMT
I am trying to have my job also run a periodic action by using a custom
source that emits a dummy element periodically and a sink that executes the
callback, as shown in the code below. However as soon as I start the job and
check the state in the JobManager UI this particular Sink->Source combo is
in state 'FINISHED' I know based on logging that the sink never received any
elements. What am I doing wrong?

      .addSink { _ => foo() }

import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.concurrent.duration.FiniteDuration

case class PeriodicSource(interval: FiniteDuration) extends
SourceFunction[Unit] {
  @volatile private var active = false

  override def run(ctx: SourceContext[Unit]): Unit = {
    while (active) {
      if (active) {

  override def cancel(): Unit = {
    active = false

  private def sleep(): Unit = {
    val startTimeMs = System.currentTimeMillis()
    val desiredSleepMs = interval.toMillis
    do {
      Thread.sleep(math.min(desiredSleepMs, 100))
    } while (active && (System.currentTimeMillis() - startTimeMs) <

View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Periodic-actions-tp5290.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

View raw message