flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Baghino <stefano.bagh...@radicalbit.io>
Subject Re: mutable hashmap outside of stream, does it get snapshotted ?
Date Thu, 07 Apr 2016 16:30:32 GMT
Hi Bart,

to make sure that the state is checkpointed you have to:

   1. configure your Flink installation with a reliable state backend
   (optional for development, you can read more about it here
   <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html#configuration>
   )
   2. explicitly enable checkpointing in your program (see how here
   <https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/fault_tolerance.html>
   — it's just a couple of lines of code)
   3. extend your operators so that they checkpoint data, by implementing
   the `Checkpointed` interface or using an instance field (the semantics of
   the two approaches are slightly different, you can read more about it
   here
   <https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html>
   )

When your data is checkpointed you can access the state if your operator
implements the `RichFunction` interface (via an abstract class that wraps
the operator you need to implement, like `RichMapFunction`).

For your need in particular, I don't know a way to checkpoint state shared
between different operators; perhaps you can you refactor your code so that
the state is encapsulated in an operator implementation and then moved
through your pipeline as a parameter of the following operators. Would that
work?

I apologize for just providing pointers to the docs in my reply but
checkpointing deserves a good explanation and I feel the docs get the job
done pretty well. I will gladly help you if you have any doubt.

Hope I've been of some help.

On Thu, Apr 7, 2016 at 3:41 PM, Bart van Deenen <bartvandeenen@fastmail.fm>
wrote:

> Hi all
>
> I'm having a datastream transformation, that updates a mutable
> hashmap that exists outside of the stream.
>
> So it's something like
>
> object FlinkJob {
>   val uriLookup = mutable.HashMap.empty[String, Int]
>
>
>   def main(args: Array[String]) {
>
>     val stream: DataStream = ...
>
>     stream.keybBy(1).timeWindow(..).fold(..)
>     .window(..)
>     .map(..).fold(..)
>     .addSink(..)
>   }
> }
>
> where the uriLookup hashmap gets updated inside the stream
> transformation,
> and is serialized in the step before the addSink
>
> It works fine, however
>
> Does the snapshotting mechanism in case of a node failure actually
> serialize this map?
>
> And out of curiousity, can I actually see what data exists inside the
> snapshot data?
>
> Thanks.
>
> Bart
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Mime
View raw message