flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9902) Improve and refactor window checkpointing IT cases
Date Fri, 20 Jul 2018 08:50:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16550516#comment-16550516
] 

ASF GitHub Bot commented on FLINK-9902:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6376#discussion_r203973756
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.CheckpointListener;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
    +import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +
    +import java.io.Serializable;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Source for window checkpointing IT cases that can introduce artificial failures.
    + */
    +public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
    +	implements ListCheckpointed<Integer>, CheckpointListener {
    +
    +	/**
    +	 * Function to generate and emit the test events (and watermarks if required).
    +	 */
    +	@FunctionalInterface
    +	public interface EventEmittingGenerator extends Serializable {
    +		void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo);
    +	}
    +
    +	private static final long INITIAL = Long.MIN_VALUE;
    +	private static final long STATEFUL_CHECKPOINT_COMPLETED = Long.MIN_VALUE;
    +
    +	@Nonnull
    +	private final EventEmittingGenerator eventEmittingGenerator;
    +	private final int expectedEmitCalls;
    +	private final int failureAfterNumElements;
    +	private final boolean usingProcessingTime;
    +	private final AtomicLong checkpointStatus;
    +
    +	private int emitCallCount;
    +	private volatile boolean running;
    +
    +	public FailingSource(
    +		@Nonnull EventEmittingGenerator eventEmittingGenerator,
    +		@Nonnegative int numberOfGeneratorInvocations) {
    +		this(eventEmittingGenerator, numberOfGeneratorInvocations, TimeCharacteristic.EventTime);
    +	}
    +
    +	public FailingSource(
    +		@Nonnull EventEmittingGenerator eventEmittingGenerator,
    +		@Nonnegative int numberOfGeneratorInvocations,
    +		@Nonnull TimeCharacteristic timeCharacteristic) {
    +		this.eventEmittingGenerator = eventEmittingGenerator;
    +		this.running = true;
    +		this.emitCallCount = 0;
    +		this.expectedEmitCalls = numberOfGeneratorInvocations;
    +		this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
    +		this.checkpointStatus = new AtomicLong(INITIAL);
    +		this.usingProcessingTime = timeCharacteristic == TimeCharacteristic.ProcessingTime;
    +	}
    +
    +	@Override
    +	public void open(Configuration parameters) {
    +		// non-parallel source
    +		assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
    +	}
    +
    +	@Override
    +	public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception
{
    +
    +		final RuntimeContext runtimeContext = getRuntimeContext();
    +		// detect if this task is "the chosen one" and should fail (via subtaskidx), if it
did not fail before (via attempt)
    +		final boolean failThisTask =
    +			runtimeContext.getAttemptNumber() == 0 && runtimeContext.getIndexOfThisSubtask()
== 0;
    +
    +		// we loop longer than we have elements, to permit delayed checkpoints
    +		// to still cause a failure
    +		while (running) {
    +
    +			// the function failed before, or we are in the elements before the failure
    +			synchronized (ctx.getCheckpointLock()) {
    +				eventEmittingGenerator.emitEvent(ctx, emitCallCount++);
    +				running &= (emitCallCount < expectedEmitCalls);
    +			}
    +
    +			if (emitCallCount < failureAfterNumElements) {
    +				Thread.sleep(1);
    +			} else if (failThisTask && emitCallCount == failureAfterNumElements) {
    +				// wait for a pending checkpoint that fulfills our requirements if needed
    +				while (checkpointStatus.get() != STATEFUL_CHECKPOINT_COMPLETED) {
    +					Thread.sleep(1);
    +				}
    +				throw new Exception("Artificial Failure");
    +			}
    +		}
    +
    +		if (usingProcessingTime) {
    --- End diff --
    
    why are we only looping at the end if using processing time? This was looping indefinitely
before to allow checkpoints to still go through.


> Improve and refactor window checkpointing IT cases
> --------------------------------------------------
>
>                 Key: FLINK-9902
>                 URL: https://issues.apache.org/jira/browse/FLINK-9902
>             Project: Flink
>          Issue Type: Test
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0, 1.7.0
>
>
> Windowing IT cases currently have a lot of duplicated code that could be unified and
deduplicated. Furthermore, the test will also not fail on problems with timer snapshots because
either there are no timers in the snapshot or all timers will still be re-inserted before
they trigger. We can cover timers as well if we change this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message