From issues-return-157418-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Mar 8 13:23:09 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3355918064C for ; Thu, 8 Mar 2018 13:23:08 +0100 (CET) Received: (qmail 69699 invoked by uid 500); 8 Mar 2018 12:23:07 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 69690 invoked by uid 99); 8 Mar 2018 12:23:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Mar 2018 12:23:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id C87B218034C for ; Thu, 8 Mar 2018 12:23:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -110.311 X-Spam-Level: X-Spam-Status: No, score=-110.311 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id vSVERui0fUO0 for ; Thu, 8 Mar 2018 12:23:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 7F34F5F613 for ; Thu, 8 Mar 2018 12:23:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 30DE1E0292 for ; Thu, 8 Mar 2018 12:23:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 7178A25407 for ; Thu, 8 Mar 2018 12:23:00 +0000 (UTC) Date: Thu, 8 Mar 2018 12:23:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-8487) State loss after multiple restart attempts MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391167#comment-16391167 ] ASF GitHub Bot commented on FLINK-8487: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r173142759 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + + haStorageDir = temporaryFolder.newFolder(); + + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString()); + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString()); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + cluster = TestBaseUtils.startCluster(config, false); + } + + @AfterClass + public static void tearDown() throws Exception { + stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + + zkServer.stop(); + zkServer.close(); + } + + /** + * Verify that we don't start a job from scratch if we cannot restore any of the + * CompletedCheckpoints. + * + *

Synchronization for the different steps and things we want to observe happens via + * latches in the test method and the methods of {@link CheckpointBlockingFunction}. + * + *

The test follows these steps: + *

    + *
  1. Start job and block on a latch until we have done some checkpoints + *
  2. Block in the special function + *
  3. Move away the contents of the ZooKeeper HA directory to make restoring from + * checkpoints impossible + *
  4. Unblock the special function, which now induces a failure + *
  5. Make sure that the job does not recover successfully + *
  6. Move back the HA directory + *
  7. Make sure that the job recovers, we use a latch to ensure that the operator + * restored successfully + *
+ */ + @Test(timeout = 120_000L) + public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { + CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1); + CheckpointBlockingFunction.successfulRestores.set(0); + CheckpointBlockingFunction.illegalRestores.set(0); + CheckpointBlockingFunction.afterMessWithZooKeeper.set(false); + CheckpointBlockingFunction.failedAlready.set(false); + + waitForCheckpointLatch = new OneShotLatch(); + failInCheckpointLatch = new OneShotLatch(); + successfulRestoreLatch = new OneShotLatch(); + + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); + env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + + File checkpointLocation = temporaryFolder.newFolder(); + env.setStateBackend(new FsStateBackend(checkpointLocation.toURI())); + + DataStreamSource source = env.addSource(new UnboundedSource()); + + source + .keyBy(new KeySelector() { + @Override + public String getKey(String value) { + return value; + } + }) + .map(new CheckpointBlockingFunction()); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID()); + + // Retrieve the job manager + final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft()); + + cluster.submitJobDetached(jobGraph); + + // wait until we did some checkpoints + waitForCheckpointLatch.await(); + + // mess with the HA directory so that the job cannot restore + File movedCheckpointLocation = temporaryFolder.newFolder(); + int numCheckpoints = 0; + File[] files = haStorageDir.listFiles(); + assertNotNull(files); + for (File file : files) { + if (file.getName().startsWith("completedCheckpoint")) { + assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName()))); + numCheckpoints++; + } + } + assertTrue(numCheckpoints > 0); + + failInCheckpointLatch.trigger(); + + // Ensure that we see at least one cycle where the job tries to restart and fails. + Future jobStatusFuture = FutureUtils.retrySuccessful( + new Callable>() { + @Override + public Future call(){ + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction() { + @Override + public boolean filter(JobStatus jobStatus){ + return jobStatus == JobStatus.RESTARTING; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.RESTARTING, jobStatusFuture.get()); + + jobStatusFuture = FutureUtils.retrySuccessful( + new Callable>() { + @Override + public Future call() { + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction() { + @Override + public boolean filter(JobStatus jobStatus) { + return jobStatus == JobStatus.FAILING; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.FAILING, jobStatusFuture.get()); + + // move back the HA directory so that the job can restore + CheckpointBlockingFunction.afterMessWithZooKeeper.set(true); + + files = movedCheckpointLocation.listFiles(); + assertNotNull(files); + for (File file : files) { + if (file.getName().startsWith("completedCheckpoint")) { + assertTrue(file.renameTo(new File(haStorageDir, file.getName()))); + } + } + + // now the job should be able to go to RUNNING again and then eventually to FINISHED + jobStatusFuture = FutureUtils.retrySuccessful( + new Callable>() { + @Override + public Future call() { + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction() { + @Override + public boolean filter(JobStatus jobStatus) { + return jobStatus == JobStatus.FINISHED; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.FINISHED, jobStatusFuture.get()); + + // make sure we saw a successful restore + successfulRestoreLatch.await(); + + assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0)); + } + + /** + * Requests the {@link JobStatus} of the job with the given {@link JobID}. + */ + private Future getJobStatus( + final ActorGateway jobManager, + final JobID jobId, + final FiniteDuration timeout) { + + scala.concurrent.Future response = + jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout); + + FlinkFuture flinkFuture = new FlinkFuture<>(response); + + return flinkFuture.thenApply(new ApplyFunction() { + @Override + public JobStatus apply(Object value) { + if (value instanceof JobManagerMessages.CurrentJobStatus) { + return ((JobManagerMessages.CurrentJobStatus) value).status(); + } else if (value instanceof JobManagerMessages.JobNotFound) { + throw new RuntimeException( + new IllegalStateException("Could not find job with JobId " + jobId)); + } else { + throw new RuntimeException( + new IllegalStateException("Unknown JobManager response of type " + value.getClass())); + } + } + }); + } + + private static class UnboundedSource implements SourceFunction { + private boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + while (running) { + ctx.collect("hello"); + // don't overdo it ... ;-) + Thread.sleep(50); + if (CheckpointBlockingFunction.afterMessWithZooKeeper.get()) { + break; + } + } + } + + @Override + public void cancel() { + running = false; + } + } + + private static class CheckpointBlockingFunction + extends RichMapFunction + implements CheckpointedFunction { + + // verify that we only call initializeState() + // once with isRestored() == false. All other invocations must have isRestored() == true. This + // verifies that we don't restart a job from scratch in case the CompletedCheckpoints can't + // be read. + static AtomicInteger allowedInitializeCallsWithoutRestore = new AtomicInteger(1); + + // we count when we see restores that are not allowed. We only + // allow restores once we messed with the HA directory and moved it back again + static AtomicInteger illegalRestores = new AtomicInteger(0); + static AtomicInteger successfulRestores = new AtomicInteger(0); + + // whether we are after the phase where we messed with the ZooKeeper HA directory, i.e. + // whether it's now ok for a restore to happen + static AtomicBoolean afterMessWithZooKeeper = new AtomicBoolean(false); + + static AtomicBoolean failedAlready = new AtomicBoolean(false); + + // also have some state to write to the checkpoint + private final ValueStateDescriptor stateDescriptor = + new ValueStateDescriptor<>("state", StringSerializer.INSTANCE); + + @Override + public String map(String value) throws Exception { + getRuntimeContext().getState(stateDescriptor).update("42"); + return value; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (context.getCheckpointId() > 5) { + waitForCheckpointLatch.trigger(); + failInCheckpointLatch.await(); + if (!failedAlready.getAndSet(true)) { + throw new RuntimeException("Failing on purpose."); + } + } + } + + @Override + public void initializeState(FunctionInitializationContext context) { + if (!context.isRestored()) { --- End diff -- No, this is exactly the thing we want to test. If we didn't have this check we would allow the case where ZooKeeper cannot read any of the state handles and will start the job from scratch. There might be other ways around it but I like this explicit way. What do you think? > State loss after multiple restart attempts > ------------------------------------------ > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.2 > Reporter: Fabian Hueske > Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby. Visit https://s.apache.org/sbnn-error > .................. > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka consumer that kept on advancing it's offset between a start and the next checkpoint failure (a minute's worth) or the the operator that had partial aggregates was lost. > The user did some in-depth analysis (see [mail thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) and might have (according to [~aljoscha]) identified the problem. > [~stefanrichter83@gmail.com], can you have a look at this issue and check if it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)