flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Victor Godoy Poluceno <victorpoluc...@gmail.com>
Subject Unable to make mapWithState work correctly
Date Tue, 25 Jul 2017 12:45:49 GMT

I am trying to write a simple streaming program to count values from a
Kafka topic in a fault tolerant manner, like this

val config: Configuration = new Configuration()
config.setString(ConfigConstants.STATE_BACKEND, "filesystem")
config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink")

val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

val stream = env
    .addSource(new FlinkKafkaConsumer010[String]("test", new
SimpleStringSchema(), properties))
    .map((_, 1))
    .mapWithState((in: (String, Int), count: Option[Int]) => {
      val newCount = in._2 + count.getOrElse(0)
      ((in._1, newCount), Some(newCount))


The idea is to use the filesystem state backend to persist the computation
state (count) and to restore the computation state in case of failure or
restart. I have a program that inject the same key on Kafka. But I am
unable to make Flink work correctly, every time the Flink restarts the
value from state is empty, so the count starts from zero. What am I missing

I am running this on a local environment (sbt run) with Flink 1.3.1, Java
1.8.0_131, and Ubuntu 16.04.


Victor Godoy Poluceno

View raw message