flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2423) Properly test checkpoint notifications
Date Tue, 04 Aug 2015 08:48:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14653285#comment-14653285
] 

ASF GitHub Bot commented on FLINK-2423:
---------------------------------------

Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/980#discussion_r36168137
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
---
    @@ -0,0 +1,429 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.functions.RichReduceFunction;
    +import org.apache.flink.api.common.state.OperatorState;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
    +import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Integration test for the {@link CheckpointNotifier} interface. The test ensures that
    + * {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for some completed
    + * checkpoints, that it is called at most once for any checkpoint id and that it is not
    + * called for a deliberately failed checkpoint.
    + *
    + * <p>
    + * Note that as a result of doing the checks on the task level there is no way to verify
    + * that the {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for every
    + * successfully completed checkpoint.
    + */
    +@SuppressWarnings("serial")
    +public class StreamCheckpointNotifierITCase {
    +
    +	private static final int NUM_TASK_MANAGERS = 2;
    +	private static final int NUM_TASK_SLOTS = 3;
    +	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
    +
    +	private static ForkableFlinkMiniCluster cluster;
    +
    +	@BeforeClass
    +	public static void startCluster() {
    +		try {
    +			Configuration config = new Configuration();
    +			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
    +			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
    +			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
    +			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
    +
    +			cluster = new ForkableFlinkMiniCluster(config, false);
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Failed to start test cluster: " + e.getMessage());
    +		}
    +	}
    +
    +	@AfterClass
    +	public static void shutdownCluster() {
    +		try {
    +			cluster.shutdown();
    +			cluster = null;
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Failed to stop test cluster: " + e.getMessage());
    +		}
    +	}
    +
    +
    +
    +	/**
    +	 * Runs the following program:
    +	 *
    +	 * <pre>
    +	 *     [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] ->
[ (groupBy/reduce)->(sink) ]
    +	 * </pre>
    +	 */
    +	@Test
    +	public void runCheckpointedProgram() {
    +
    +		final long NUM_STRINGS = 10000000L;
    +		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
    +
    +		try {
    +			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
    +					"localhost", cluster.getJobManagerRPCPort());
    +			env.setParallelism(PARALLELISM);
    +			env.enableCheckpointing(500);
    +			env.getConfig().disableSysoutLogging();
    +
    +			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
    +
    +			stream
    +					// -------------- first vertex, chained to the src ----------------
    +					.filter(new StringRichFilterFunction())
    +
    +					// -------------- second vertex, applying the co-map ----------------
    +					.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
    +
    +					// -------------- third vertex - the stateful one that also fails ----------------
    +					.map(new StringPrefixCountRichMapFunction())
    +					.startNewChain()
    +					.map(new IdentityMapFunction())
    +
    +							// -------------- fourth vertex - reducer and the sink ----------------
    +					.groupBy("prefix")
    +					.reduce(new OnceFailingReducer(NUM_STRINGS))
    +					.addSink(new RichSinkFunction<PrefixCount>() {
    +
    --- End diff --
    
    This sink is not needed any more.


> Properly test checkpoint notifications
> --------------------------------------
>
>                 Key: FLINK-2423
>                 URL: https://issues.apache.org/jira/browse/FLINK-2423
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>            Reporter: Gyula Fora
>            Assignee: Márton Balassi
>
> Checkpoint notifications (via the CheckpointNotifier interface) are currently not properly
tested. 
> A test should be included to verify that checkpoint notifications are eventually called
on successful checkpoints, and that they are only called once per checkpointID.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message