flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Krugler <kkrugler_li...@transpac.com>
Subject OneInputStreamOperatorTestHarness.snapshot doesn't include timers?
Date Wed, 15 Aug 2018 22:24:50 GMT
Hi all,

It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot
fails to include any timers that had been registered via registerProcessingTimeTimer but had
not yet fired when the snapshot was saved.

Is this a known limitation of OneInputStreamOperatorTestHarness?

If not, is there anything special I need to do when setting up the test harness to ensure
that timers are saved?

Below is the unit test, which shows how the test harness is being set up and run.

The TimerFunction used in this test does seem to be doing the right thing, as using it in
a simple job on a local Flink cluster works as expected when creating & then restarting
from a savepoint.


— Ken

package com.scaleunlimited.flinkcrawler.functions;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.scaleunlimited.flinkcrawler.tools.TimerTool;

public class TimerTest {
    public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);

    private List<Long> _firedTimers = new ArrayList<Long>();

    public void setUp() throws Exception {
    public void testTimerSaving() throws Throwable {
        // This operator doesn't really do much at all, but the first element
        // it processes will create a timer for (timestamp+1).
        // Whenever that timer fires, it will create another timer for 
        // (timestamp+1).
        KeyedProcessOperator<Integer, Integer, Integer> operator = 
            new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
        // Create a test harness from scratch
        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
            makeTestHarness(operator, null);
        // We begin at time zero

        // Process some elements, which should also create a timer for time 1.
        int inputs[] = new int[] {1, 2, 3};
        for (int input : inputs) {
            testHarness.processElement(new StreamRecord<>(input));
        // Force some time to pass, which should keep moving the timer ahead,
        // finally leaving it set for time 10.
        for (long i = 1; i < 10; i++) {
        // Save the state, which we assume should include the timer we set for
        // time 10.
        OperatorSubtaskState savedState = 
            testHarness.snapshot(0L, testHarness.getProcessingTime());
        // Close the first test harness
        // Create a new test harness using the saved state (which we assume
        // includes the timer for time 10).
        testHarness = makeTestHarness(operator, savedState);
        // Force more time to pass, which should keep moving the timer ahead.
        for (long i = 10; i < 20; i++) {
        // Close the second test harness and make sure all the timers we expect
        // actually fired.
        for (long i = 1; i < 20; i++) {
            // TODO This expectation currently fails, since Timers don't
            // seem to be included in the snapshot, at least the one produced by
            // the test harness.

    private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
            KeyedProcessOperator<Integer, Integer, Integer> operator,
            OperatorSubtaskState savedState) 
            throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> result;
        result = 
            new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
                    new TimerTool.IdentityKeySelector<Integer>(),
        if (savedState != null) {
        return result;

package com.scaleunlimited.flinkcrawler.functions;

import java.util.List;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimerFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
    static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
    List<Long> _firedTimers;
    long _period;

    public TimerFunction(List<Long> firedTimers) {
        this(firedTimers, 1);

    public TimerFunction(List<Long> firedTimers, long period) {
        _firedTimers = firedTimers;
        _period = period;

    public void onTimer(long timestamp,
                        KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext
                        Collector<Integer> out) throws Exception {
        super.onTimer(timestamp, context, out);
        long nextTimestamp = timestamp + _period;
        LOGGER.info("Firing at {}; Setting new timer for {}",

    public void processElement( Integer input,
                                KeyedProcessFunction<Integer, Integer, Integer>.Context
                                Collector<Integer> out)
        throws Exception {
        LOGGER.info("Processing input {}", input);
        if (_firedTimers.isEmpty()) {
            long firstTimestamp = 
                context.timerService().currentProcessingTime() + _period;
            LOGGER.info("Setting initial timer for {}",

Ken Krugler
+1 530-210-6378
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

View raw message