flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
Date Fri, 02 Mar 2018 14:22:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383622#comment-16383622
] 

ASF GitHub Bot commented on FLINK-8459:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171858593
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.client.program.MiniClusterClient;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
    +import org.apache.flink.runtime.checkpoint.CheckpointOptions;
    +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
    +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
    +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
    +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
    +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.ExceptionUtils;
    +
    +import org.junit.Assume;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +
    +import static org.hamcrest.MatcherAssert.assertThat;
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.hamcrest.Matchers.hasItem;
    +import static org.hamcrest.Matchers.isOneOf;
    +
    +/**
    + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String,
boolean, Time)}.
    + *
    + * @see org.apache.flink.runtime.jobmaster.JobMaster
    + */
    +@Category(Flip6.class)
    +public class JobMasterTriggerSavepointIT extends AbstractTestBase {
    +
    +	private static CountDownLatch invokeLatch;
    +
    +	private static volatile CountDownLatch triggerCheckpointLatch;
    +
    +	@Rule
    +	public TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private Path savepointDirectory;
    +	private MiniClusterClient clusterClient;
    +	private JobGraph jobGraph;
    +
    +	@Before
    +	public void setUp() throws Exception {
    +		invokeLatch = new CountDownLatch(1);
    +		triggerCheckpointLatch = new CountDownLatch(1);
    +		savepointDirectory = temporaryFolder.newFolder().toPath();
    +
    +		Assume.assumeTrue(
    +			"ClusterClient is not an instance of MiniClusterClient",
    +			miniClusterResource.getClusterClient() instanceof MiniClusterClient);
    +
    +		clusterClient = (MiniClusterClient) miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +
    +		jobGraph = new JobGraph();
    +
    +		final JobVertex vertex = new JobVertex("testVertex");
    +		vertex.setInvokableClass(NoOpBlockingInvokable.class);
    +		jobGraph.addVertex(vertex);
    +
    +		jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
    +			Collections.singletonList(vertex.getID()),
    +			Collections.singletonList(vertex.getID()),
    +			Collections.singletonList(vertex.getID()),
    +			new CheckpointCoordinatorConfiguration(
    +				10,
    +				60_000,
    +				10,
    +				1,
    +				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
    +				true),
    +			null
    +		));
    +
    +		clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
    +		invokeLatch.await(60, TimeUnit.SECONDS);
    +		waitForJob();
    +	}
    +
    +	@Test
    +	public void testStopJobAfterSavepoint() throws Exception {
    +		final String savepointLocation = cancelWithSavepoint();
    +		final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60,
TimeUnit.SECONDS);
    +
    +		assertThat(jobStatus, isOneOf(JobStatus.CANCELED, JobStatus.CANCELLING));
    +
    +		final List<Path> savepoints = Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
    +		assertThat(savepoints, hasItem(Paths.get(savepointLocation).getFileName()));
    +	}
    +
    +	@Test
    +	public void testDoNotCancelJobIfSavepointFails() throws Exception {
    +		try {
    +			Files.setPosixFilePermissions(savepointDirectory, Collections.emptySet());
    +		} catch (IOException e) {
    +			Assume.assumeNoException(e);
    +		}
    +
    +		try {
    +			cancelWithSavepoint();
    +		} catch (Exception e) {
    +			assertThat(ExceptionUtils.findThrowable(e, CheckpointTriggerException.class).isPresent(),
equalTo(true));
    +		}
    +
    +		final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60,
TimeUnit.SECONDS);
    +		assertThat(jobStatus, equalTo(JobStatus.RUNNING));
    +
    +		// assert that checkpoints are continued to be triggered
    +		triggerCheckpointLatch = new CountDownLatch(1);
    +		assertThat(triggerCheckpointLatch.await(60, TimeUnit.SECONDS), equalTo(true));
    +	}
    +
    +	private void waitForJob() throws Exception {
    +		for (int i = 0; i < 60; i++) {
    --- End diff --
    
    no need to wait 1 minute


> Implement cancelWithSavepoint in RestClusterClient
> --------------------------------------------------
>
>                 Key: FLINK-8459
>                 URL: https://issues.apache.org/jira/browse/FLINK-8459
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> Implement the method
>         {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String
savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating the logic
in {{JobCancellationWithSavepointHandlers}}. The former will have different semantics because
the checkpoint scheduler is not stopped. Thus it is not guaranteed that there won't be additional
checkpoints between the savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message