flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...
Date Wed, 26 Apr 2017 19:06:27 GMT
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3778#discussion_r113446953
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointFrom12MigrationITCase.java
---
    @@ -0,0 +1,771 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +import org.apache.flink.api.common.accumulators.IntCounter;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueState;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.operators.Triggerable;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.Collector;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This verifies that we can restore a complete job from a Flink 1.2 savepoint.
    + *
    + * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
    + *
    + * <p>The tests will time out if they don't see the required number of successful
checks within
    + * a time limit.
    + */
    +public class StatefulUDFSavepointFrom12MigrationITCase extends SavepointMigrationTestBase
{
    +	private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +	/**
    +	 * This has to be manually executed to create the savepoint on Flink 1.2.
    +	 */
    +	@Test
    +	@Ignore
    +	public void testCreateSavepointOnFlink12() throws Exception {
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +		// we only test memory state backend yet
    +		env.setStateBackend(new MemoryStateBackend());
    +		env.enableCheckpointing(500);
    +		env.setParallelism(4);
    +		env.setMaxParallelism(4);
    +
    +		env
    +				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
    +				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
    +				.keyBy(0)
    +				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
    +				.keyBy(0)
    +				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
    +				.keyBy(0)
    +				.transform(
    +						"custom_operator",
    +						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
    +						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
    +				.keyBy(0)
    +				.transform(
    +						"timely_stateful_operator",
    +						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
    +						new TimelyStatefulOperator()).uid("TimelyStatefulOperator")
    +				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
    +
    +		executeAndSavepoint(
    +				env,
    +				"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint",
    +				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
    +	}
    +
    +	/**
    +	 * This has to be manually executed to create the savepoint on Flink 1.2.
    +	 */
    +	@Test
    +	@Ignore
    +	public void testCreateSavepointOnFlink12WithRocksDB() throws Exception {
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +		RocksDBStateBackend rocksBackend =
    +				new RocksDBStateBackend(new MemoryStateBackend());
    +		env.setStateBackend(rocksBackend);
    +		env.enableCheckpointing(500);
    +		env.setParallelism(4);
    +		env.setMaxParallelism(4);
    +
    +		env
    +				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
    +				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
    +				.keyBy(0)
    +				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
    +				.keyBy(0)
    +				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
    +				.keyBy(0)
    +				.transform(
    +						"custom_operator",
    +						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
    +						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
    +				.keyBy(0)
    +				.transform(
    +						"timely_stateful_operator",
    +						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
    +						new TimelyStatefulOperator()).uid("TimelyStatefulOperator")
    +				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
    +
    +		executeAndSavepoint(
    +				env,
    +				"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb",
    +				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
    +	}
    +
    +
    +	@Test
    +	public void testSavepointRestoreFromFlink12() throws Exception {
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +		// we only test memory state backend yet
    +		env.setStateBackend(new MemoryStateBackend());
    +		env.enableCheckpointing(500);
    +		env.setParallelism(4);
    +		env.setMaxParallelism(4);
    +
    +		env
    +				.addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
    +				.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
    +				.keyBy(0)
    +				.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
    +				.keyBy(0)
    +				.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
    +				.keyBy(0)
    +				.transform(
    +						"custom_operator",
    +						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
    +						new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
    +				.keyBy(0)
    +				.transform(
    +						"timely_stateful_operator",
    +						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
    +						new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
    +				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
    +
    +		restoreAndExecute(
    +				env,
    +				getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"),
    +				new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
1),
    +				new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
    +	}
    +
    +	@Test
    +	public void testSavepointRestoreFromFlink12FromRocksDB() throws Exception {
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +		// we only test memory state backend yet
    +		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
    +		env.enableCheckpointing(500);
    +		env.setParallelism(4);
    +		env.setMaxParallelism(4);
    +
    +		env
    +				.addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
    +				.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
    +				.keyBy(0)
    +				.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
    +				.keyBy(0)
    +				.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
    +				.keyBy(0)
    +				.transform(
    +						"custom_operator",
    +						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
    +						new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
    +				.keyBy(0)
    +				.transform(
    +						"timely_stateful_operator",
    +						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
    +						new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
    +				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
    +
    +		restoreAndExecute(
    +				env,
    +				getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb"),
    +				new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
1),
    +				new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR,
NUM_SOURCE_ELEMENTS),
    +				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
    +	}
    +
    +	private static class LegacyCheckpointedSource
    +			implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String>
{
    +
    +		public static String CHECKPOINTED_STRING = "Here be dragons!";
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		private volatile boolean isRunning = true;
    +
    +		private final int numElements;
    +
    +		public LegacyCheckpointedSource(int numElements) {
    +			this.numElements = numElements;
    +		}
    +
    +		@Override
    +		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception
{
    +
    +			ctx.emitWatermark(new Watermark(0));
    +
    +			synchronized (ctx.getCheckpointLock()) {
    +				for (long i = 0; i < numElements; i++) {
    +					ctx.collect(new Tuple2<>(i, i));
    +				}
    +			}
    +
    +			// don't emit a final watermark so that we don't trigger the registered event-time
    +			// timers
    +			while (isRunning) {
    +				Thread.sleep(20);
    +			}
    +		}
    +
    +		@Override
    +		public void cancel() {
    +			isRunning = false;
    +		}
    +
    +		@Override
    +		public void restoreState(String state) throws Exception {
    +			assertEquals(CHECKPOINTED_STRING, state);
    +		}
    +
    +		@Override
    +		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception
{
    +			return CHECKPOINTED_STRING;
    +		}
    +	}
    +
    +	private static class CheckingRestoringSource
    +			extends RichSourceFunction<Tuple2<Long, Long>>
    +			implements CheckpointedRestoring<String> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringSource.class
+ "_RESTORE_CHECK";
    +
    +		private volatile boolean isRunning = true;
    +
    +		private final int numElements;
    +
    +		private String restoredState;
    +
    +		public CheckingRestoringSource(int numElements) {
    +			this.numElements = numElements;
    +		}
    +
    +		@Override
    +		public void open(Configuration parameters) throws Exception {
    +			super.open(parameters);
    +
    +			getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
    +		}
    +
    +		@Override
    +		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception
{
    +			assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
    +			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
    +
    +			// immediately trigger any set timers
    +			ctx.emitWatermark(new Watermark(1000));
    +
    +			synchronized (ctx.getCheckpointLock()) {
    +				for (long i = 0; i < numElements; i++) {
    +					ctx.collect(new Tuple2<>(i, i));
    +				}
    +			}
    +
    +			while (isRunning) {
    +				Thread.sleep(20);
    +			}
    +		}
    +
    +		@Override
    +		public void cancel() {
    +			isRunning = false;
    +		}
    +
    +		@Override
    +		public void restoreState(String state) throws Exception {
    +			restoredState = state;
    +		}
    +	}
    +
    +	public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>>
    +			implements Checkpointed<Tuple2<String, Long>> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
    +				new Tuple2<>("hello", 42L);
    +
    +		@Override
    +		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>>
out) throws Exception {
    +			out.collect(value);
    +		}
    +
    +		@Override
    +		public void restoreState(Tuple2<String, Long> state) throws Exception {
    +		}
    +
    +		@Override
    +		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp)
throws Exception {
    +			return CHECKPOINTED_TUPLE;
    +		}
    +	}
    +
    +	public static class CheckingRestoringFlatMap extends RichFlatMapFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>>
    +			implements CheckpointedRestoring<Tuple2<String, Long>> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMap.class
+ "_RESTORE_CHECK";
    +
    +		private transient Tuple2<String, Long> restoredState;
    +
    +		@Override
    +		public void open(Configuration parameters) throws Exception {
    +			super.open(parameters);
    +
    +			getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
    +		}
    +
    +		@Override
    +		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>>
out) throws Exception {
    +			out.collect(value);
    +
    +			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
    +			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
    +
    +		}
    +
    +		@Override
    +		public void restoreState(Tuple2<String, Long> state) throws Exception {
    +			restoredState = state;
    +		}
    +	}
    +
    +	public static class LegacyCheckpointedFlatMapWithKeyedState
    +			extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
    +			implements Checkpointed<Tuple2<String, Long>> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
    +				new Tuple2<>("hello", 42L);
    +
    +		private final ValueStateDescriptor<Long> stateDescriptor =
    +				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
    +
    +		@Override
    +		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>>
out) throws Exception {
    +			out.collect(value);
    +
    +			getRuntimeContext().getState(stateDescriptor).update(value.f1);
    +
    +			assertEquals(value.f1, getRuntimeContext().getState(stateDescriptor).value());
    --- End diff --
    
    Isn't this always true? We just set the value after all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message