Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F1B11200C09 for ; Tue, 20 Dec 2016 16:09:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F0788160B29; Tue, 20 Dec 2016 15:09:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 27ACF160B3D for ; Tue, 20 Dec 2016 16:09:18 +0100 (CET) Received: (qmail 31948 invoked by uid 500); 20 Dec 2016 15:09:17 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 31745 invoked by uid 99); 20 Dec 2016 15:09:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Dec 2016 15:09:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 95CF4DF9AD; Tue, 20 Dec 2016 15:09:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Tue, 20 Dec 2016 15:09:20 -0000 Message-Id: In-Reply-To: <5a69b20870df42518e0c04434a7b0a7a@git.apache.org> References: <5a69b20870df42518e0c04434a7b0a7a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/15] flink git commit: [FLINK-5366] SavepointUtil into SavepointMigrationTestBase/Add Test archived-at: Tue, 20 Dec 2016 15:09:20 -0000 [FLINK-5366] SavepointUtil into SavepointMigrationTestBase/Add Test This also changes how the savepoint is being performed and now we're waiting on accumulators to signal that a job is ready for savepointing. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cbd9f5d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cbd9f5d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cbd9f5d Branch: refs/heads/master Commit: 2cbd9f5d1ba43059b8bf748f97d2392b1e8f0ab3 Parents: 74df763 Author: Aljoscha Krettek Authored: Mon Dec 19 12:18:49 2016 +0100 Committer: Aljoscha Krettek Committed: Tue Dec 20 15:42:54 2016 +0100 ---------------------------------------------------------------------- .../utils/SavepointMigrationTestBase.java | 241 ++++++++ .../test/checkpointing/utils/SavepointUtil.java | 341 ----------- .../StatefulUDFSavepointMigrationITCase.java | 562 +++++++++++++++++++ .../utils/UserFunctionStateJob.java | 113 ---- ...eful-udf-migration-itcase-flink1.1-savepoint | Bin 0 -> 27146 bytes ...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 0 -> 22283 bytes 6 files changed, 803 insertions(+), 454 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java new file mode 100644 index 0000000..80a66ac --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import java.io.File; +import java.net.URI; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static junit.framework.Assert.fail; + +public class SavepointMigrationTestBase extends TestBaseUtils { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class); + private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); + protected static final int DEFAULT_PARALLELISM = 4; + protected LocalFlinkMiniCluster cluster = null; + + protected static String getResourceFilename(String filename) { + ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + @Before + public void setup() throws Exception { + + // Flink configuration + final Configuration config = new Configuration(); + + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM); + + final File checkpointDir = tempFolder.newFolder("checkpoints").getAbsoluteFile(); + final File savepointDir = tempFolder.newFolder("savepoints").getAbsoluteFile(); + + if (!checkpointDir.exists() || !savepointDir.exists()) { + throw new Exception("Test setup failed: failed to create (temporary) directories."); + } + + LOG.info("Created temporary checkpoint directory: " + checkpointDir + "."); + LOG.info("Created savepoint directory: " + savepointDir + "."); + + config.setString(ConfigConstants.STATE_BACKEND, "memory"); + config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); + config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0"); + config.setString("state.savepoints.dir", savepointDir.toURI().toString()); + + cluster = TestBaseUtils.startCluster(config, false); + } + + @After + public void teardown() throws Exception { + stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + } + + protected void executeAndSavepoint( + StreamExecutionEnvironment env, + String savepointPath, + Tuple2... expectedAccumulators) throws Exception { + + // Retrieve the job manager + ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + + // Submit the job + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + + + JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); + + LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID()); + + StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); + + boolean done = false; + while (DEADLINE.hasTimeLeft()) { + Thread.sleep(100); + Map accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID()); + + boolean allDone = true; + for (Tuple2 acc : expectedAccumulators) { + Integer numFinished = (Integer) accumulators.get(acc.f0); + if (numFinished == null) { + allDone = false; + break; + } + if (!numFinished.equals(acc.f1)) { + allDone = false; + break; + } + } + if (allDone) { + done = true; + break; + } + } + + if (!done) { + fail("Did not see the expected accumulator results within time limit."); + } + + LOG.info("Triggering savepoint."); + // Flink 1.2 + final Future savepointResultFuture = + jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID(), Option.empty()), DEADLINE.timeLeft()); + + // Flink 1.1 +// final Future savepointResultFuture = +// jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID()), DEADLINE.timeLeft()); + + + Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft()); + + if (savepointResult instanceof JobManagerMessages.TriggerSavepointFailure) { + fail("Error drawing savepoint: " + ((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause()); + } + + // jobmanager will store savepoint in heap, we have to retrieve it + final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath(); + LOG.info("Saved savepoint: " + jobmanagerSavepointPath); + + // Flink 1.2 + FileUtils.moveFile(new File(new URI(jobmanagerSavepointPath).getPath()), new File(savepointPath)); + + // Flink 1.1 + // Retrieve the savepoint from the testing job manager +// LOG.info("Requesting the savepoint."); +// Future savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(jobmanagerSavepointPath), DEADLINE.timeLeft()); +// +// Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint(); +// LOG.info("Retrieved savepoint: " + jobmanagerSavepointPath + "."); +// +// LOG.info("Storing savepoint to file."); +// Configuration config = new Configuration(); +// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); +// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file:///Users/aljoscha/Downloads"); +// String path = org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint); +// +// FileUtils.moveFile(new File(new URI(path).getPath()), new File(savepointPath)); + } + + protected void restoreAndExecute( + StreamExecutionEnvironment env, + String savepointPath, + Tuple2... expectedAccumulators) throws Exception { + + int parallelism = env.getParallelism(); + + // Retrieve the job manager + + ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + + // Submit the job + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + + jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); + + JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); + + StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); + + boolean done = false; + while (DEADLINE.hasTimeLeft()) { + Thread.sleep(100); + Map accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID()); + + boolean allDone = true; + for (Tuple2 acc : expectedAccumulators) { + Integer numFinished = (Integer) accumulators.get(acc.f0); + if (numFinished == null) { + System.out.println("NO ACC FOR " + acc); + allDone = false; + break; + } + if (!numFinished.equals(acc.f1)) { + System.out.println("TO LOW FOR ACC" + acc); + allDone = false; + break; + } + } + System.out.println("ACC: " + accumulators); + if (allDone) { + done = true; + break; + } + } + + if (!done) { + fail("Did not see the expected accumulator results within time limit."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java deleted file mode 100644 index 85e21c5..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.checkpointing.utils; - -import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.concurrent.TimeUnit; -import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; -import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -public class SavepointUtil { - - // list of JobGraphs to create savepoints for - private static final ArrayList> savepointJobs = new ArrayList<>(); - static { - savepointJobs.add(UserFunctionStateJob.class); - } - - private static final Logger LOG = LoggerFactory.getLogger(SavepointUtil.class); - private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); - private static final String SAVEPOINT_BASE_DIR = "./flink-tests/src/test/resources/savepoints/"; - - private static final int STATE_WAIT_FOR_JOB = 0; - private static final int STATE_REQUEST_SAVEPOINT = 1; - private static final int STATE_SAVEPOINT_DONE = 2; - private static final int STATE_WAIT_FOR_TEST_JOB = 3; - private static final int STATE_TEST_RESULT = 4; - private static final int STATE_END = 5; - - private static volatile int state = STATE_WAIT_FOR_JOB; - - private static TestingCluster flink = null; - private static ActorGateway jobManager = null; - private static JobID jobId = null; - private static File savepointDir = null; - private static Exception testResult = null; - - public static void main(String[] args) throws Exception { - - // clean up -// FileUtils.deleteDirectory(new File(SAVEPOINT_BASE_DIR)); - - for (Class testJob : savepointJobs) { - SavepointTestJob job = testJob.newInstance(); - -// runJobAndCreateSavepoint(job); - - runJobAndCompareState(job); - - triggerEndOfTest(); - } - } - - public static synchronized void triggerSavepoint() { - SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT; - } - - public static synchronized boolean allowStateChange() { - return SavepointUtil.state < SavepointUtil.STATE_REQUEST_SAVEPOINT; - } - - public static synchronized void triggerOrTestSavepoint(RichFunction function, Object expected, Object actual) throws Exception { - if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) { - if (expected.equals(actual)) { - LOG.info("Test was successful."); - SavepointUtil.testResult = null; - SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT; - } else { - LOG.info("Test failed."); - SavepointUtil.testResult = new Exception("Comparison of state failed. Expected: " + expected + " but was: " + actual); - SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT; - } - } else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) { - final StateCondition condition = new StateCondition(function.getClass(), function.getRuntimeContext().getIndexOfThisSubtask()); - if (!testCounters.containsKey(condition)) { - testCounters.put(condition, 0); - } - final Integer counter = testCounters.get(condition); - testCounters.put(condition, counter + 1); - // check if all counters are ready - if (checkIfReadyForSavepoint()) { - SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT; - } - } - } - - public static void triggerEndOfTest() throws Exception { - LOG.info("Cancelling Flink."); - if (flink != null) { - flink.stop(); - } - SavepointUtil.state = SavepointUtil.STATE_END; - } - - public static void runJobAndCreateSavepoint(SavepointTestJob job) throws Exception { - LOG.info("Waiting for job."); - SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_JOB; - - final Thread t = new Thread(new SavepointPerformer()); - t.start(); - - runJob(job); - - while(SavepointUtil.state != SavepointUtil.STATE_SAVEPOINT_DONE && DEADLINE.hasTimeLeft()) { - Thread.sleep(100); - } - } - - public static void runJobAndCompareState(SavepointTestJob job) throws Exception { - LOG.info("Waiting for test job."); - SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_TEST_JOB; - - runJob(job); - - while(SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT && DEADLINE.hasTimeLeft()) { - Thread.sleep(100); - } - - if (SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT) { - throw new Exception("No test result available."); - } - if (testResult != null) { - throw testResult; - } - } - - public static void setTestResult(Exception e) { - SavepointUtil.testResult = e; - SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT; - } - - // -------------------------------------------------------------------------------------------- - - private static void runJob(SavepointTestJob job) throws Exception { - // Config - int numTaskManagers = 2; - int numSlotsPerTaskManager = 2; - int parallelism = numTaskManagers * numSlotsPerTaskManager; - String savepointPath = SAVEPOINT_BASE_DIR + job.getClass().getSimpleName(); - - // Flink configuration - final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); - - final File checkpointDir = File.createTempFile("checkpoints", Long.toString(System.nanoTime())); - savepointDir = new File(savepointPath); - savepointDir.mkdirs(); - - if (!checkpointDir.exists() || !savepointDir.exists()) { - throw new Exception("Test setup failed: failed to create (temporary) directories."); - } - - LOG.info("Created temporary checkpoint directory: " + checkpointDir + "."); - LOG.info("Created savepoint directory: " + savepointDir + "."); - - config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); - config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0"); - config.setString("state.savepoints.dir", savepointDir.toURI().toString()); - - LOG.info("Flink configuration: " + config + "."); - - // Start Flink - flink = new TestingCluster(config); - flink.start(); - - // Retrieve the job manager - jobManager = Await.result(flink.leaderGateway().future(), DEADLINE.timeLeft()); - - // Submit the job - final JobGraph jobGraph = job.createJobGraph(); - if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) { - savepointCondition = job.getSavepointCondition(); - testCounters.clear(); - } else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) { - final File[] dir = savepointDir.listFiles(); - if (dir.length == 0) { - throw new RuntimeException("Savepoint of " + job.getClass().getSimpleName() + " does not exist."); - } - jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(dir[0].getAbsolutePath())); - } - jobId = jobGraph.getJobID(); - - LOG.info("Submitting job " + jobGraph.getJobID() + " and waiting..."); - - flink.submitJobDetached(jobGraph); - } - - private static final HashMap testCounters = new HashMap<>(); - private static SavepointCondition[] savepointCondition = null; - - private static boolean checkIfReadyForSavepoint() { - for (SavepointCondition condition : savepointCondition) { - final StateCondition stateCondition = new StateCondition(condition.clazz, condition.subtask); - if (!testCounters.containsKey(stateCondition) || testCounters.get(stateCondition) != condition.invocation) { - return false; - } - } - return true; - } - - private static void performSavepointAndShutdown() throws Exception { - LOG.info("Triggering a savepoint."); - - // Flink 1.2 - final Future savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.empty()), DEADLINE.timeLeft()); - // Flink 1.1 -// final Future savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId), DEADLINE.timeLeft()); - - final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(savepointPathFuture, DEADLINE.timeLeft())).savepointPath(); - LOG.info("Saved savepoint: " + savepointPath); - - // Retrieve the savepoint from the testing job manager - LOG.info("Requesting the savepoint."); - Future savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), DEADLINE.timeLeft()); - - Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint(); - LOG.info("Retrieved savepoint: " + savepointPath + "."); - - LOG.info("Storing savepoint to file."); - - // Flink 1.2 - // it might be that the savepoint has already been written to file in Flink 1.2 - // this is just the command how to do it in 1.2 -// org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepoint(savepointDir.getAbsolutePath(), savepoint); - // Flink 1.1 - // this writes it for FLink 1.1 -// Configuration config = new Configuration(); -// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); -// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file://" + savepointDir.getAbsolutePath()); -// org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint); - - LOG.info("Cancelling Flink."); - flink.stop(); - - SavepointUtil.state = SavepointUtil.STATE_SAVEPOINT_DONE; - } - - private static class StateCondition { - private Class clazz; - private Integer subtask; - - StateCondition(Class clazz, Integer subtask) { - this.clazz = clazz; - this.subtask = subtask; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - StateCondition that = (StateCondition) o; - - return clazz.equals(that.clazz) && subtask.equals(that.subtask); - } - - @Override - public int hashCode() { - int result = clazz.hashCode(); - result = 31 * result + subtask.hashCode(); - return result; - } - } - - public static class SavepointCondition { - Class clazz; - int subtask; - int invocation; - - SavepointCondition(Class clazz, int subtask, int invocation) { - this.clazz = clazz; - this.subtask = subtask; - this.invocation = invocation; - } - } - - public interface SavepointTestJob { - JobGraph createJobGraph(); - - SavepointCondition[] getSavepointCondition(); - } - - private static class SavepointPerformer implements Runnable { - - @Override - public void run() { - try { - while (SavepointUtil.state != SavepointUtil.STATE_END) { - Thread.sleep(100); - if (SavepointUtil.state == SavepointUtil.STATE_REQUEST_SAVEPOINT) { - try { - performSavepointAndShutdown(); - } catch (Exception e) { - throw new RuntimeException("Performing savepoint failed.", e); - } - } - } - } catch (InterruptedException e) { - // stop execution - } - LOG.info("SavepointPerformer Thread finished."); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java new file mode 100644 index 0000000..cc21683 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.junit.Ignore; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * This verifies that we can restore a complete job from a Flink 1.1 savepoint. + * + *

The test pipeline contains both "Checkpointed" state and keyed user state. + */ +public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestBase { + private static final int NUM_SOURCE_ELEMENTS = 4; + private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS"; + private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS"; + + /** + * This has to be manually executed to create the savepoint on Flink 1.1. + */ + @Test + @Ignore + public void testCreateSavepointOnFlink11() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new MemoryStateBackend()); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + // create source + env + .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint>() {}.getTypeInfo(), + new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") + .addSink(new AccumulatorCountingSink>(EXPECTED_ELEMENTS_ACCUMULATOR)); + + executeAndSavepoint( + env, + "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint", + new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + /** + * This has to be manually executed to create the savepoint on Flink 1.1. + */ + @Test + @Ignore + public void testCreateSavepointOnFlink11WithRocksDB() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + RocksDBStateBackend rocksBackend = + new RocksDBStateBackend(new MemoryStateBackend()); +// rocksBackend.enableFullyAsyncSnapshots(); + env.setStateBackend(rocksBackend); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + // create source + env + .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint>() {}.getTypeInfo(), + new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") + .addSink(new AccumulatorCountingSink>(EXPECTED_ELEMENTS_ACCUMULATOR)); + + executeAndSavepoint( + env, + "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb", + new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + + @Test + public void testSavepointRestoreFromFlink11() throws Exception { + + final int EXPECTED_SUCCESSFUL_CHECKS = 21; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new MemoryStateBackend()); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + // create source + env + .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint>() {}.getTypeInfo(), + new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") + .addSink(new AccumulatorCountingSink>(EXPECTED_ELEMENTS_ACCUMULATOR)); + + restoreAndExecute( + env, + getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"), + new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS)); + } + + @Test + public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception { + + final int EXPECTED_SUCCESSFUL_CHECKS = 21; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + // create source + env + .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint>() {}.getTypeInfo(), + new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") + .addSink(new AccumulatorCountingSink>(EXPECTED_ELEMENTS_ACCUMULATOR)); + + restoreAndExecute( + env, + getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"), + new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS)); + } + + private static class LegacyCheckpointedSource + implements SourceFunction>, Checkpointed { + + public static String CHECKPOINTED_STRING = "Here be dragons!"; + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + public LegacyCheckpointedSource(int numElements) { + this.numElements = numElements; + } + + @Override + public void run(SourceContext> ctx) throws Exception { + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void restoreState(String state) throws Exception { + assertEquals(CHECKPOINTED_STRING, state); + } + + @Override + public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return CHECKPOINTED_STRING; + } + } + + private static class RestoringCheckingSource + extends RichSourceFunction> + implements CheckpointedRestoring { + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + private String restoredState; + + public RestoringCheckingSource(int numElements) { + this.numElements = numElements; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void run(SourceContext> ctx) throws Exception { + assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void restoreState(String state) throws Exception { + restoredState = state; + } + } + + public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction, Tuple2> + implements Checkpointed> { + + private static final long serialVersionUID = 1L; + + public static Tuple2 CHECKPOINTED_TUPLE = + new Tuple2<>("hello", 42L); + + @Override + public void flatMap(Tuple2 value, Collector> out) throws Exception { + out.collect(value); + } + + @Override + public void restoreState(Tuple2 state) throws Exception { + } + + @Override + public Tuple2 snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return CHECKPOINTED_TUPLE; + } + } + + public static class RestoringCheckingFlatMap extends RichFlatMapFunction, Tuple2> + implements CheckpointedRestoring> { + + private static final long serialVersionUID = 1L; + + private transient Tuple2 restoredState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2 value, Collector> out) throws Exception { + out.collect(value); + + assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); + + } + + @Override + public void restoreState(Tuple2 state) throws Exception { + restoredState = state; + } + } + + public static class LegacyCheckpointedFlatMapWithKeyedState + extends RichFlatMapFunction, Tuple2> + implements Checkpointed> { + + private static final long serialVersionUID = 1L; + + public static Tuple2 CHECKPOINTED_TUPLE = + new Tuple2<>("hello", 42L); + + private final ValueStateDescriptor stateDescriptor = + new ValueStateDescriptor("state-name", LongSerializer.INSTANCE, null); + + @Override + public void flatMap(Tuple2 value, Collector> out) throws Exception { + out.collect(value); + + getRuntimeContext().getState(stateDescriptor).update(value.f1); + } + + @Override + public void restoreState(Tuple2 state) throws Exception { + } + + @Override + public Tuple2 snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return CHECKPOINTED_TUPLE; + } + } + + public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction, Tuple2> + implements CheckpointedRestoring> { + + private static final long serialVersionUID = 1L; + + private transient Tuple2 restoredState; + + private final ValueStateDescriptor stateDescriptor = + new ValueStateDescriptor("state-name", LongSerializer.INSTANCE, null); + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2 value, Collector> out) throws Exception { + out.collect(value); + + ValueState state = getRuntimeContext().getState(stateDescriptor); + if (state == null) { + throw new RuntimeException("Missing key value state for " + value); + } + + assertEquals(value.f1, state.value()); + assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void restoreState(Tuple2 state) throws Exception { + restoredState = state; + } + } + + public static class KeyedStateSettingFlatMap extends RichFlatMapFunction, Tuple2> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor stateDescriptor = + new ValueStateDescriptor("state-name", LongSerializer.INSTANCE, null); + + @Override + public void flatMap(Tuple2 value, Collector> out) throws Exception { + out.collect(value); + + getRuntimeContext().getState(stateDescriptor).update(value.f1); + } + } + + public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction, Tuple2> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor stateDescriptor = + new ValueStateDescriptor("state-name", LongSerializer.INSTANCE, null); + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2 value, Collector> out) throws Exception { + out.collect(value); + + ValueState state = getRuntimeContext().getState(stateDescriptor); + if (state == null) { + throw new RuntimeException("Missing key value state for " + value); + } + + assertEquals(value.f1, state.value()); + getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); + } + } + + public static class CheckpointedUdfOperator + extends AbstractUdfStreamOperator, FlatMapFunction, Tuple2>> + implements OneInputStreamOperator, Tuple2> { + private static final long serialVersionUID = 1L; + + private static final String CHECKPOINTED_STRING = "Oh my, that's nice!"; + + public CheckpointedUdfOperator(FlatMapFunction, Tuple2> userFunction) { + super(userFunction); + } + + @Override + public void processElement(StreamRecord> element) throws Exception { + output.collect(element); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(mark); + } + + // Flink 1.1 +// @Override +// public StreamTaskState snapshotOperatorState( +// long checkpointId, long timestamp) throws Exception { +// StreamTaskState result = super.snapshotOperatorState(checkpointId, timestamp); +// +// AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView( +// checkpointId, +// timestamp); +// +// out.writeUTF(CHECKPOINTED_STRING); +// +// result.setOperatorState(out.closeAndGetHandle()); +// +// return result; +// } + } + + public static class RestoringCheckingUdfOperator + extends AbstractUdfStreamOperator, FlatMapFunction, Tuple2>> + implements OneInputStreamOperator, Tuple2> { + private static final long serialVersionUID = 1L; + + private String restoredState; + + public RestoringCheckingUdfOperator(FlatMapFunction, Tuple2> userFunction) { + super(userFunction); + } + + @Override + public void open() throws Exception { + super.open(); + } + + @Override + public void processElement(StreamRecord> element) throws Exception { + userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output)); + + assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(mark); + } + + @Override + public void restoreState(FSDataInputStream in) throws Exception { + super.restoreState(in); + + DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in); + + restoredState = streamWrapper.readUTF(); + } + } + + public static class AccumulatorCountingSink extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private final String accumulatorName; + + int count = 0; + + public AccumulatorCountingSink(String accumulatorName) { + this.accumulatorName = accumulatorName; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(accumulatorName, new IntCounter()); + } + + @Override + public void invoke(T value) throws Exception { + count++; + getRuntimeContext().getAccumulator(accumulatorName).add(1); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java deleted file mode 100644 index 1df7938..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.checkpointing.utils; - -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointCondition; -import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointTestJob; -import org.apache.flink.util.Collector; - -public class UserFunctionStateJob implements SavepointTestJob { - - @Override - public JobGraph createJobGraph() { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // we only test memory state backend yet - env.setStateBackend(new MemoryStateBackend()); - env.enableCheckpointing(500); - env.setParallelism(1); - env.setMaxParallelism(1); - - // create source - final DataStream> source = env - .addSource(new SourceFunction>() { - - private volatile boolean isRunning = true; - - @Override - public void run(SourceContext> ctx) throws Exception { - while (isRunning) { - synchronized (ctx.getCheckpointLock()) { - ctx.collect(new Tuple2<>(1L, 1L)); - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - }).uid("CustomSourceFunction"); - - // non-keyed operator state - source.flatMap(new SumFlatMapperNonKeyedCheckpointed()).uid("SumFlatMapperNonKeyedCheckpointed").startNewChain().print(); - - return env.getStreamGraph().getJobGraph(); - } - - @Override - public SavepointCondition[] getSavepointCondition() { - return new SavepointCondition[] { - new SavepointCondition(SumFlatMapperNonKeyedCheckpointed.class, 0, 4) - }; - } - - public static class SumFlatMapperNonKeyedCheckpointed extends RichFlatMapFunction, Tuple2> - implements Checkpointed> { - - private transient Tuple2 sum; - - @Override - public void restoreState(Tuple2 state) throws Exception { - sum = state; - } - - @Override - public void flatMap(Tuple2 value, Collector> out) throws Exception { - if (SavepointUtil.allowStateChange()) { - if (sum == null) { - sum = value; - out.collect(sum); - } else { - sum.f1 += value.f1; - out.collect(sum); - } - } - - SavepointUtil.triggerOrTestSavepoint( - this, - new Tuple2<>(value.f1, value.f1 * 4), - sum); - } - - @Override - public Tuple2 snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return sum; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint new file mode 100644 index 0000000..f2f6dcd Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint differ http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb new file mode 100644 index 0000000..e63038b Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb differ