From issues-return-154616-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Feb 22 11:56:08 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4BC3C18064E for ; Thu, 22 Feb 2018 11:56:07 +0100 (CET) Received: (qmail 10586 invoked by uid 500); 22 Feb 2018 10:56:06 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 10571 invoked by uid 99); 22 Feb 2018 10:56:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Feb 2018 10:56:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id E0B881A073D for ; Thu, 22 Feb 2018 10:56:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -102.311 X-Spam-Level: X-Spam-Status: No, score=-102.311 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id bcDk-dlr5yHZ for ; Thu, 22 Feb 2018 10:56:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 85D0F5F124 for ; Thu, 22 Feb 2018 10:56:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id B33CFE00D3 for ; Thu, 22 Feb 2018 10:56:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 5D85726061 for ; Thu, 22 Feb 2018 10:56:00 +0000 (UTC) Date: Thu, 22 Feb 2018 10:56:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-8735) Add savepoint migration ITCase that covers operator state MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372655#comment-16372655 ] ASF GitHub Bot commented on FLINK-8735: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5552#discussion_r169919922 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java --- @@ -0,0 +1,658 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +/** + * Migration ITCases for a stateful job. The tests are parameterized to cover + * migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + /** + * TODO to generate savepoints for a specific Flink version / backend type, + * TODO change these values accordingly, e.g. to generate for 1.4 with RocksDB, + * TODO set as (MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME) + */ + private final MigrationVersion flinkGenerateSavepointVersion = MigrationVersion.v1_4; + private final String flinkGenerateSavepointBackendType = StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME; + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobSavepointMigrationITCase(Tuple2 testMigrateVersionAndBackend) { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + /** + * Manually run this to write binary snapshot data. + */ + @Test + @Ignore + public void writeSavepoint() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (flinkGenerateSavepointBackendType) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + break; + case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: + env.setStateBackend(new MemoryStateBackend()); + break; + default: + throw new UnsupportedOperationException(); + } + + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new CheckpointedNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS)).uid("CheckpointedSource1") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap1") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint>() {}.getTypeInfo(), + new TimelyStatefulOperator()).uid("TimelyStatefulOperator1") + .addSink(new AccumulatorCountingSink<>()); + + env + .addSource(new CheckpointedParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS)).uid("CheckpointedSource2") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap2") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint>() {}.getTypeInfo(), + new TimelyStatefulOperator()).uid("TimelyStatefulOperator2") + .addSink(new AccumulatorCountingSink<>()); + + executeAndSavepoint( + env, + "src/test/resources/" + getSavepointPath(flinkGenerateSavepointVersion, flinkGenerateSavepointBackendType), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2)); + } + + @Test + public void testSavepointRestore() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + break; + case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: + env.setStateBackend(new MemoryStateBackend()); + break; + default: + throw new UnsupportedOperationException(); + } + + env.enableCheckpointing(500); + env.setParallelism(parallelism); + env.setMaxParallelism(parallelism); + + env + .addSource(new CheckingRestoringNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS)).uid("CheckpointedSource1") + .keyBy(0) + .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap1") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint>() {}.getTypeInfo(), + new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator1") + .addSink(new AccumulatorCountingSink<>()); + + env + .addSource(new CheckingRestoringParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS)).uid("CheckpointedSource2") + .keyBy(0) + .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap2") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint>() {}.getTypeInfo(), + new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator2") + .addSink(new AccumulatorCountingSink<>()); + + restoreAndExecute( + env, + getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)), + new Tuple2<>(CheckingRestoringNonParallelSourceWithListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), + new Tuple2<>(CheckingRestoringParallelSourceWithUnionListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, parallelism), + new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2)); + } + + private String getSavepointPath(MigrationVersion savepointVersion, String backendType) { + switch (backendType) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-rocksdb-savepoint"; + case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: + return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint"; + default: + throw new UnsupportedOperationException(); + } + } + + private static class CheckpointedNonParallelSourceWithListState + implements SourceFunction>, CheckpointedFunction { + + final static ListStateDescriptor stateDescriptor = + new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE); + + final static String checkpointedString = "Here be dragons!"; + final static String checkpointedString1 = "Here be more dragons!"; + final static String checkpointedString2 = "Here be yet more dragons!"; + final static String checkpointedString3 = "Here be the mostest dragons!"; + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + private transient ListState unionListState; + + public CheckpointedNonParallelSourceWithListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + unionListState.clear(); + unionListState.add(checkpointedString); + unionListState.add(checkpointedString1); + unionListState.add(checkpointedString2); + unionListState.add(checkpointedString3); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + unionListState = context.getOperatorStateStore().getListState( + stateDescriptor); + } + + @Override + public void run(SourceContext> ctx) throws Exception { + + ctx.emitWatermark(new Watermark(0)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + // don't emit a final watermark so that we don't trigger the registered event-time + // timers + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class CheckingRestoringNonParallelSourceWithListState + extends RichSourceFunction> implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringNonParallelSourceWithListState.class + "_RESTORE_CHECK"; + + private volatile boolean isRunning = true; + + private final int numElements; + + public CheckingRestoringNonParallelSourceWithListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ListState unionListState = context.getOperatorStateStore().getListState( + CheckpointedNonParallelSourceWithListState.stateDescriptor); + + if (context.isRestored()) { + assertThat(unionListState.get(), + containsInAnyOrder( + CheckpointedNonParallelSourceWithListState.checkpointedString, + CheckpointedNonParallelSourceWithListState.checkpointedString1, + CheckpointedNonParallelSourceWithListState.checkpointedString2, + CheckpointedNonParallelSourceWithListState.checkpointedString3)); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } else { + throw new RuntimeException( + "This source should always be restored because it's only used when restoring from a savepoint."); + } + } + + @Override + public void run(SourceContext> ctx) throws Exception { + + // immediately trigger any set timers + ctx.emitWatermark(new Watermark(1000)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class CheckpointedParallelSourceWithUnionListState + extends RichSourceFunction> implements CheckpointedFunction { + + final static ListStateDescriptor stateDescriptor = + new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE); + + final static String[] checkpointedStrings = { + "Here be dragons!", + "Here be more dragons!", + "Here be yet more dragons!", + "Here be the mostest dragons!" }; + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + private transient ListState unionListState; + + public CheckpointedParallelSourceWithUnionListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + unionListState.clear(); + + for (String s : checkpointedStrings) { + if (s.hashCode() % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) { --- End diff -- But this would only work if the length of the list matches the parallelism. It does here, but I wanted to be more general. > Add savepoint migration ITCase that covers operator state > --------------------------------------------------------- > > Key: FLINK-8735 > URL: https://issues.apache.org/jira/browse/FLINK-8735 > Project: Flink > Issue Type: Bug > Components: Tests > Affects Versions: 1.4.0, 1.5.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.5.0, 1.4.2 > > > The current {{StatefulJobSavepointMigrationITCase}} does not cover operator state, meaning state accessed using {{OperatorStateStore}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)