From issues-return-157321-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Mar 7 21:25:14 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 28F92180656 for ; Wed, 7 Mar 2018 21:25:12 +0100 (CET) Received: (qmail 50458 invoked by uid 500); 7 Mar 2018 20:25:12 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 50449 invoked by uid 99); 7 Mar 2018 20:25:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Mar 2018 20:25:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id BD2AC1A0691 for ; Wed, 7 Mar 2018 20:25:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -102.311 X-Spam-Level: X-Spam-Status: No, score=-102.311 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id VEVfqFQNMkFs for ; Wed, 7 Mar 2018 20:25:09 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 570C85F195 for ; Wed, 7 Mar 2018 20:25:08 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 1FF24E0372 for ; Wed, 7 Mar 2018 20:25:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id D0327253FF for ; Wed, 7 Mar 2018 20:25:01 +0000 (UTC) Date: Wed, 7 Mar 2018 20:25:01 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-8487) State loss after multiple restart attempts MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390154#comment-16390154 ] ASF GitHub Bot commented on FLINK-8487: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972943 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + + haStorageDir = temporaryFolder.newFolder(); + + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString()); + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString()); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + cluster = TestBaseUtils.startCluster(config, false); + } + + @AfterClass + public static void tearDown() throws Exception { + stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + + zkServer.stop(); + zkServer.close(); + } + + /** + * Verify that we don't start a job from scratch if we cannot restore any of the + * CompletedCheckpoints. + * + *

Synchronization for the different steps and things we want to observe happens via + * latches in the test method and the methods of {@link CheckpointBlockingFunction}. + * + *

The test follows these steps: + *

    + *
  1. Start job and block on a latch until we have done some checkpoints + *
  2. Block in the special function + *
  3. Move away the contents of the ZooKeeper HA directory to make restoring from + * checkpoints impossible + *
  4. Unblock the special function, which now induces a failure + *
  5. Make sure that the job does not recover successfully + *
  6. Move back the HA directory + *
  7. Make sure that the job recovers, we use a latch to ensure that the operator + * restored successfully + *
+ */ + @Test(timeout = 120_000L) + public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { + CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1); + CheckpointBlockingFunction.successfulRestores.set(0); + CheckpointBlockingFunction.illegalRestores.set(0); + CheckpointBlockingFunction.afterMessWithZooKeeper.set(false); + CheckpointBlockingFunction.failedAlready.set(false); + + waitForCheckpointLatch = new OneShotLatch(); + failInCheckpointLatch = new OneShotLatch(); + successfulRestoreLatch = new OneShotLatch(); + + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); + env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + + File checkpointLocation = temporaryFolder.newFolder(); + env.setStateBackend(new FsStateBackend(checkpointLocation.toURI())); + + DataStreamSource source = env.addSource(new UnboundedSource()); + + source + .keyBy(new KeySelector() { + @Override + public String getKey(String value) { + return value; + } + }) + .map(new CheckpointBlockingFunction()); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID()); + + // Retrieve the job manager + final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft()); + + cluster.submitJobDetached(jobGraph); + + // wait until we did some checkpoints + waitForCheckpointLatch.await(); + + // mess with the HA directory so that the job cannot restore + File movedCheckpointLocation = temporaryFolder.newFolder(); + int numCheckpoints = 0; + File[] files = haStorageDir.listFiles(); + assertNotNull(files); + for (File file : files) { + if (file.getName().startsWith("completedCheckpoint")) { + assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName()))); + numCheckpoints++; + } + } + assertTrue(numCheckpoints > 0); + + failInCheckpointLatch.trigger(); + + // Ensure that we see at least one cycle where the job tries to restart and fails. + Future jobStatusFuture = FutureUtils.retrySuccessful( + new Callable>() { + @Override + public Future call(){ + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction() { + @Override + public boolean filter(JobStatus jobStatus){ + return jobStatus == JobStatus.RESTARTING; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.RESTARTING, jobStatusFuture.get()); + + jobStatusFuture = FutureUtils.retrySuccessful( + new Callable>() { + @Override + public Future call() { + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction() { + @Override + public boolean filter(JobStatus jobStatus) { + return jobStatus == JobStatus.FAILING; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.FAILING, jobStatusFuture.get()); + + // move back the HA directory so that the job can restore + CheckpointBlockingFunction.afterMessWithZooKeeper.set(true); + + files = movedCheckpointLocation.listFiles(); + assertNotNull(files); + for (File file : files) { + if (file.getName().startsWith("completedCheckpoint")) { + assertTrue(file.renameTo(new File(haStorageDir, file.getName()))); + } + } + + // now the job should be able to go to RUNNING again and then eventually to FINISHED + jobStatusFuture = FutureUtils.retrySuccessful( + new Callable>() { + @Override + public Future call() { + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction() { + @Override + public boolean filter(JobStatus jobStatus) { + return jobStatus == JobStatus.FINISHED; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.FINISHED, jobStatusFuture.get()); + + // make sure we saw a successful restore + successfulRestoreLatch.await(); + + assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0)); --- End diff -- Or drop the `illegalRestores` - may not be necessary, see above. > State loss after multiple restart attempts > ------------------------------------------ > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.2 > Reporter: Fabian Hueske > Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby. Visit https://s.apache.org/sbnn-error > .................. > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka consumer that kept on advancing it's offset between a start and the next checkpoint failure (a minute's worth) or the the operator that had partial aggregates was lost. > The user did some in-depth analysis (see [mail thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) and might have (according to [~aljoscha]) identified the problem. > [~stefanrichter83@gmail.com], can you have a look at this issue and check if it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)