@Aljoscha:
For this word count example I am using a kafka topic as the input stream. The problem is that when I cancel the task and restart it, the task loses the accumulated word counts so far and start counting from 1 again. Am I missing something basic here?

@Stefano:
I also tried to implements the Checkpointed interface but had no luck either. Canceling and restarting the task did not restore the states. Here is my class:

inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
  .keyBy({s => s})
  .map(new StatefulCounter)

class StatefulCounter extends RichMapFunction[String, (String,Int)] with Checkpointed[Integer] {
  private var count: Integer = 0
  
  def map(in: String): (String,Int) = {
    count += 1
    return (in, count)
  }
  def snapshotState(l: Long, l1: Long): Integer = {
    count
  }
  def restoreState(state: Integer) {
    count = state
  }
}


Thanks,


Jack Huang

On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <stefano.baghino@radicalbit.io> wrote:
My bad, thanks for pointing that out.

On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <aljoscha@apache.org> wrote:
Hi,
the *withState() family of functions use the Key/Value state interface internally, so that should work.

On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <stefano.baghino@radicalbit.io> wrote:
Hi Jack,

it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted.

The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

I hope I've been of some help, I'll gladly help you further if you need it.

On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang <jackhuang@machinezone.com> wrote:
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit