flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests
Date Fri, 28 Oct 2016 09:42:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15614926#comment-15614926
] 

ASF GitHub Bot commented on FLINK-4552:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2572#discussion_r85500759
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
---
    @@ -0,0 +1,369 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.runtime.operators.windowing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.state.MergingState;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueState;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
    +import org.apache.flink.runtime.query.KvStateRegistry;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyedStateBackend;
    +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.api.operators.TestInternalTimerService;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    +import org.apache.flink.streaming.api.windowing.windows.Window;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +
    +/**
    + * Utility for testing {@link Trigger} behaviour.
    + */
    +public class TriggerTestHarness<T, W extends Window> {
    +
    +	private static final Integer KEY = 1;
    +
    +	private final Trigger<T, W> trigger;
    +	private final TypeSerializer<W> windowSerializer;
    +
    +	private final HeapKeyedStateBackend<Integer> stateBackend;
    +	private final TestInternalTimerService<Integer, W> internalTimerService;
    +
    +	public TriggerTestHarness(
    +			Trigger<T, W> trigger,
    +			TypeSerializer<W> windowSerializer) throws Exception {
    +		this.trigger = trigger;
    +		this.windowSerializer = windowSerializer;
    +
    +		// we only ever use one key, other tests make sure that windows work across different
    +		// keys
    +		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
    +		MemoryStateBackend backend = new MemoryStateBackend();
    +
    +		@SuppressWarnings("unchecked")
    +		HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>)
backend.createKeyedStateBackend(dummyEnv,
    +				new JobID(),
    +				"test_op",
    +				IntSerializer.INSTANCE,
    +				1,
    +				new KeyGroupRange(0, 0),
    +				new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
    +		this.stateBackend = stateBackend;
    +
    +		this.stateBackend.setCurrentKey(0);
    +
    +		this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
    +			@Override
    +			public void setCurrentKey(Object key) {
    +				// ignore
    +			}
    +
    +			@Override
    +			public Object getCurrentKey() {
    +				return KEY;
    +			}
    +		});
    +	}
    +
    +	public int numProcessingTimeTimers() {
    +		return internalTimerService.numProcessingTimeTimers();
    +	}
    +
    +	public int numProcessingTimeTimers(W window) {
    +		return internalTimerService.numProcessingTimeTimers(window);
    +	}
    +
    +	public int numEventTimeTimers() {
    +		return internalTimerService.numEventTimeTimers();
    +	}
    +
    +	public int numEventTimeTimers(W window) {
    +		return internalTimerService.numEventTimeTimers(window);
    +	}
    +
    +	public int numStateEntries() {
    +		return stateBackend.numStateEntries();
    +	}
    +
    +	public int numStateEntries(W window) {
    +		return stateBackend.numStateEntries(window);
    +	}
    +
    +	/**
    +	 * Injects one element into the trigger for the given window and returns the result
of
    +	 * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}
    +	 */
    +	public TriggerResult processElement(StreamRecord<T> element, W window) throws
Exception {
    +		TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
    +				KEY,
    +				window,
    +				internalTimerService,
    +				stateBackend,
    +				windowSerializer);
    +		return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext);
    +	}
    +
    +	/**
    +	 * Advanced processing time and checks whether we have exactly one firing for the given
    +	 * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}
    +	 * is returned for that firing.
    +	 */
    +	public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
    +		Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
    +
    +		if (firings.size() != 1) {
    +			throw new IllegalStateException("Must have exactly one timer firing. Fired timers:
" + firings);
    +		}
    +
    +		Tuple2<W, TriggerResult> firing = firings.iterator().next();
    --- End diff --
    
    With the check in the lines above (that checks for the size of `firings`) it should be
correct, right?


> Refactor WindowOperator/Trigger Tests
> -------------------------------------
>
>                 Key: FLINK-4552
>                 URL: https://issues.apache.org/jira/browse/FLINK-4552
>             Project: Flink
>          Issue Type: Improvement
>          Components: Windowing Operators
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and {{WindowFunction}}
are all conflated in {{WindowOperatorTest}}. All of these test that a certain combination
of a {{Trigger}}, {{WindowAssigner}} and {{WindowFunction}} produce the expected output.
> We should modularize these tests and spread them out across multiple files, possibly
one per trigger, for the triggers. Also, we should extend/change the tests in some key ways:
>  - {{WindowOperatorTest}} test should just verify that the interaction between {{WindowOperator}}
and the various other parts works as expected, that the correct methods on {{Trigger}} and
{{WindowFunction}} are called at the expected time and that snapshotting, timers, cleanup
etc. work correctly. These tests should also verify that the different state types and {{WindowFunctions}}
work correctly.
>  - {{Trigger}} tests should present elements to triggers and verify that they fire at
the correct times. The actual output of the {{WindowFunction}} is not important for these
tests. We should also test that triggers correctly clean up state and timers.
>  - {{WindowAssigner}} tests should test each window assigner and also verify that, for
example, the offset parameter of time-based windows works correctly.
> There is already {{WindowingTestHarness}} but it is not used by tests, I think we can
expand on that and provide more thorough test coverage while also making the tests more maintainable
({{WindowOperatorTest.java}} is nearing 3000 lines of code).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message