flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nico Kruber (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-6791) Using MemoryStateBackend as checkpoint stream back-end may block checkpoint/savepoint creation
Date Thu, 01 Jun 2017 10:42:04 GMT
Nico Kruber created FLINK-6791:
----------------------------------

             Summary: Using MemoryStateBackend as checkpoint stream back-end may block checkpoint/savepoint
creation
                 Key: FLINK-6791
                 URL: https://issues.apache.org/jira/browse/FLINK-6791
             Project: Flink
          Issue Type: Bug
          Components: State Backends, Checkpointing
    Affects Versions: 1.2.1, 1.3.0
            Reporter: Nico Kruber


If the `MemoryStateBackend` is used as the checkpoint stream back-end in e.g. RocksDBStateBackend,
it will block further checkpoint/savepoint creation if the checkpoint data reaches the back-end's
max state size. In that case, an error message is logged at the task manager but the save-/checkpoint
never completes and although the job continues, no further checkpoints will be made.

Please see the following example that should be reproducible:

{code:java}
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend(1000 * 1024 * 1024, false),
false));

env.enableCheckpointing(100L);

final long numKeys = 100_000L;
DataStreamSource<Tuple1<Long>> source1 =
		env.addSource(new RichParallelSourceFunction<Tuple1<Long>>() {
			private volatile boolean running = true;

			@Override
			public void run(SourceContext<Tuple1<Long>> ctx) throws Exception {
				long counter = 0;

				while (running) {
					synchronized (ctx.getCheckpointLock()) {
						ctx.collect(Tuple1.of(counter % numKeys));
						counter++;
					}

					Thread.yield();
				}
			}

			@Override
			public void cancel() {
				running = false;
			}
		});

source1.keyBy(0)
		.map(new RichMapFunction<Tuple1<Long>, Tuple1<Long>>() {
			private transient ValueState<List<Long>> val;

			@Override
			public Tuple1<Long> map(Tuple1<Long> value)
					throws Exception {
				val.update(Collections.nCopies(100, value.f0));
				return value;
			}

			@Override
			public void open(final Configuration parameters) throws Exception {
				ValueStateDescriptor<List<Long>> descriptor =
						new ValueStateDescriptor<>(
								"data", // the state name
								TypeInformation.of(new TypeHint<List<Long>>() {
								}) // type information
						);
				val = getRuntimeContext().getState(descriptor);
			}
		}).uid("identity-map-with-state")
		.addSink(new DiscardingSink<Tuple1<Long>>());

env.execute("failingsnapshots");
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message