flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nico Kruber (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-6791) Using MemoryStateBackend as checkpoint stream back-end may block checkpoint/savepoint creation
Date Thu, 01 Jun 2017 10:42:04 GMT
Nico Kruber created FLINK-6791:

             Summary: Using MemoryStateBackend as checkpoint stream back-end may block checkpoint/savepoint
                 Key: FLINK-6791
                 URL: https://issues.apache.org/jira/browse/FLINK-6791
             Project: Flink
          Issue Type: Bug
          Components: State Backends, Checkpointing
    Affects Versions: 1.2.1, 1.3.0
            Reporter: Nico Kruber

If the `MemoryStateBackend` is used as the checkpoint stream back-end in e.g. RocksDBStateBackend,
it will block further checkpoint/savepoint creation if the checkpoint data reaches the back-end's
max state size. In that case, an error message is logged at the task manager but the save-/checkpoint
never completes and although the job continues, no further checkpoints will be made.

Please see the following example that should be reproducible:

env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend(1000 * 1024 * 1024, false),


final long numKeys = 100_000L;
DataStreamSource<Tuple1<Long>> source1 =
		env.addSource(new RichParallelSourceFunction<Tuple1<Long>>() {
			private volatile boolean running = true;

			public void run(SourceContext<Tuple1<Long>> ctx) throws Exception {
				long counter = 0;

				while (running) {
					synchronized (ctx.getCheckpointLock()) {
						ctx.collect(Tuple1.of(counter % numKeys));


			public void cancel() {
				running = false;

		.map(new RichMapFunction<Tuple1<Long>, Tuple1<Long>>() {
			private transient ValueState<List<Long>> val;

			public Tuple1<Long> map(Tuple1<Long> value)
					throws Exception {
				val.update(Collections.nCopies(100, value.f0));
				return value;

			public void open(final Configuration parameters) throws Exception {
				ValueStateDescriptor<List<Long>> descriptor =
						new ValueStateDescriptor<>(
								"data", // the state name
								TypeInformation.of(new TypeHint<List<Long>>() {
								}) // type information
				val = getRuntimeContext().getState(descriptor);
		.addSink(new DiscardingSink<Tuple1<Long>>());


This message was sent by Atlassian JIRA

View raw message