Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F3BC0200BA3 for ; Thu, 20 Oct 2016 16:15:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F2502160AE0; Thu, 20 Oct 2016 14:15:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 94933160ADB for ; Thu, 20 Oct 2016 16:15:20 +0200 (CEST) Received: (qmail 66193 invoked by uid 500); 20 Oct 2016 14:15:19 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 66169 invoked by uid 99); 20 Oct 2016 14:15:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2016 14:15:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 45281E3934; Thu, 20 Oct 2016 14:15:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Thu, 20 Oct 2016 14:15:19 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/8] flink git commit: [FLINK-4842] Introduce test to enforce order of operator / udf lifecycles archived-at: Thu, 20 Oct 2016 14:15:22 -0000 Repository: flink Updated Branches: refs/heads/master 428419d59 -> 1e475c768 [FLINK-4842] Introduce test to enforce order of operator / udf lifecycles Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e475c76 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e475c76 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e475c76 Branch: refs/heads/master Commit: 1e475c768ae0d7e13746a3ca6aa258141016d419 Parents: cab9cd4 Author: Stefan Richter Authored: Thu Oct 13 11:32:19 2016 +0200 Committer: Aljoscha Krettek Committed: Thu Oct 20 16:14:21 2016 +0200 ---------------------------------------------------------------------- .../AbstractUdfStreamOperatorLifecycleTest.java | 293 +++++++++++++++++++ .../AbstractUdfStreamOperatorTest.java | 219 -------------- .../streaming/runtime/tasks/StreamTaskTest.java | 4 +- 3 files changed, 295 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1e475c76/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java new file mode 100644 index 0000000..cbb833b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTaskTest; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * This test secures the lifecycle of AbstractUdfStreamOperator, including it's UDF handling. + */ +public class AbstractUdfStreamOperatorLifecycleTest { + + private static final List EXPECTED_CALL_ORDER_FULL = Arrays.asList( + "OPERATOR::setup", + "UDF::setRuntimeContext", + "OPERATOR::initializeState", + "OPERATOR::open", + "UDF::open", + "OPERATOR::run", + "UDF::run", + "OPERATOR::snapshotState", + "OPERATOR::close", + "UDF::close", + "OPERATOR::dispose"); + + private static final List EXPECTED_CALL_ORDER_CANCEL_RUNNING = Arrays.asList( + "OPERATOR::setup", + "UDF::setRuntimeContext", + "OPERATOR::initializeState", + "OPERATOR::open", + "UDF::open", + "OPERATOR::run", + "UDF::run", + "OPERATOR::cancel", + "UDF::cancel", + "OPERATOR::dispose", + "UDF::close"); + + private static final String ALL_METHODS_STREAM_OPERATOR = "[close[], dispose[], getChainingStrategy[], " + + "getMetricGroup[], initializeState[class org.apache.flink.streaming.runtime.tasks.OperatorStateHandles], " + + "notifyOfCompletedCheckpoint[long], open[], setChainingStrategy[class " + + "org.apache.flink.streaming.api.operators.ChainingStrategy], setKeyContextElement1[class " + + "org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " + + "setKeyContextElement2[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " + + "setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " + + "org.apache.flink.streaming.api.graph.StreamConfig, interface " + + "org.apache.flink.streaming.api.operators.Output], snapshotState[long, long, " + + "interface org.apache.flink.runtime.state.CheckpointStreamFactory]]"; + + private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" + + ", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface " + + "org.apache.flink.api.common.functions.RuntimeContext]]"; + + private static final List ACTUAL_ORDER_TRACKING = + Collections.synchronizedList(new ArrayList(EXPECTED_CALL_ORDER_FULL.size())); + + @Test + public void testAllMethodsRegisteredInTest() { + List methodsWithSignatureString = new ArrayList<>(); + for (Method method : StreamOperator.class.getMethods()) { + methodsWithSignatureString.add(method.getName() + Arrays.toString(method.getParameterTypes())); + } + Collections.sort(methodsWithSignatureString); + Assert.assertEquals("It seems like new methods have been introduced to " + StreamOperator.class + + ". Please register them with this test and ensure to document their position in the lifecycle " + + "(if applicable).", ALL_METHODS_STREAM_OPERATOR, methodsWithSignatureString.toString()); + + methodsWithSignatureString = new ArrayList<>(); + for (Method method : RichFunction.class.getMethods()) { + methodsWithSignatureString.add(method.getName() + Arrays.toString(method.getParameterTypes())); + } + Collections.sort(methodsWithSignatureString); + Assert.assertEquals("It seems like new methods have been introduced to " + RichFunction.class + + ". Please register them with this test and ensure to document their position in the lifecycle " + + "(if applicable).", ALL_METHODS_RICH_FUNCTION, methodsWithSignatureString.toString()); + } + + @Test + public void testLifeCycleFull() throws Exception { + ACTUAL_ORDER_TRACKING.clear(); + + Configuration taskManagerConfig = new Configuration(); + StreamConfig cfg = new StreamConfig(new Configuration()); + MockSourceFunction srcFun = new MockSourceFunction(); + + cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, true)); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + Task task = StreamTaskTest.createTask(SourceStreamTask.class, cfg, taskManagerConfig); + + task.startTaskThread(); + + LifecycleTrackingStreamSource.runStarted.await(); + + // wait for clean termination + task.getExecutingThread().join(); + assertEquals(ExecutionState.FINISHED, task.getExecutionState()); + assertEquals(EXPECTED_CALL_ORDER_FULL, ACTUAL_ORDER_TRACKING); + } + + @Test + public void testLifeCycleCancel() throws Exception { + ACTUAL_ORDER_TRACKING.clear(); + + Configuration taskManagerConfig = new Configuration(); + StreamConfig cfg = new StreamConfig(new Configuration()); + MockSourceFunction srcFun = new MockSourceFunction(); + cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, false)); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + Task task = StreamTaskTest.createTask(SourceStreamTask.class, cfg, taskManagerConfig); + + task.startTaskThread(); + LifecycleTrackingStreamSource.runStarted.await(); + + // this should cancel the task even though it is blocked on runFinished + task.cancelExecution(); + + // wait for clean termination + task.getExecutingThread().join(); + assertEquals(ExecutionState.CANCELED, task.getExecutionState()); + assertEquals(EXPECTED_CALL_ORDER_CANCEL_RUNNING, ACTUAL_ORDER_TRACKING); + } + + private static class MockSourceFunction extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext ctx) { + ACTUAL_ORDER_TRACKING.add("UDF::run"); + } + + @Override + public void cancel() { + ACTUAL_ORDER_TRACKING.add("UDF::cancel"); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + ACTUAL_ORDER_TRACKING.add("UDF::setRuntimeContext"); + super.setRuntimeContext(t); + } + + @Override + public void open(Configuration parameters) throws Exception { + ACTUAL_ORDER_TRACKING.add("UDF::open"); + super.open(parameters); + } + + @Override + public void close() throws Exception { + ACTUAL_ORDER_TRACKING.add("UDF::close"); + super.close(); + } + } + + private static class LifecycleTrackingStreamSource> + extends StreamSource implements Serializable { + + private static final long serialVersionUID = 2431488948886850562L; + private transient Thread testCheckpointer; + + private final boolean simulateCheckpointing; + + static OneShotLatch runStarted; + static OneShotLatch runFinish; + + public LifecycleTrackingStreamSource(SRC sourceFunction, boolean simulateCheckpointing) { + super(sourceFunction); + this.simulateCheckpointing = simulateCheckpointing; + runStarted = new OneShotLatch(); + runFinish = new OneShotLatch(); + } + + @Override + public void run(Object lockingObject, Output> collector) throws Exception { + ACTUAL_ORDER_TRACKING.add("OPERATOR::run"); + super.run(lockingObject, collector); + runStarted.trigger(); + runFinish.await(); + } + + @Override + public void setup(StreamTask containingTask, StreamConfig config, Output> output) { + ACTUAL_ORDER_TRACKING.add("OPERATOR::setup"); + super.setup(containingTask, config, output); + if (simulateCheckpointing) { + testCheckpointer = new Thread() { + @Override + public void run() { + long id = 0; + while (true) { + try { + Thread.sleep(50); + if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint( + new CheckpointMetaData(id++, System.currentTimeMillis()))) { + LifecycleTrackingStreamSource.runFinish.trigger(); + break; + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + }; + testCheckpointer.start(); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotState"); + super.snapshotState(context); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState"); + super.initializeState(context); + } + + @Override + public void open() throws Exception { + ACTUAL_ORDER_TRACKING.add("OPERATOR::open"); + super.open(); + } + + @Override + public void close() throws Exception { + ACTUAL_ORDER_TRACKING.add("OPERATOR::close"); + super.close(); + } + + @Override + public void cancel() { + ACTUAL_ORDER_TRACKING.add("OPERATOR::cancel"); + super.cancel(); + } + + @Override + public void dispose() throws Exception { + ACTUAL_ORDER_TRACKING.add("OPERATOR::dispose"); + super.dispose(); + if (simulateCheckpointing) { + testCheckpointer.join(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1e475c76/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java deleted file mode 100644 index f5d633c..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.filecache.FileCache; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; -import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.runtime.taskmanager.CheckpointResponder; -import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.runtime.taskmanager.TaskManagerConnection; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.util.SerializedValue; -import org.junit.Test; - -import java.io.Serializable; -import java.net.URL; -import java.util.Collections; -import java.util.concurrent.Executor; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class AbstractUdfStreamOperatorTest { - - @Test - public void testLifeCycle() throws Exception { - - Configuration taskManagerConfig = new Configuration(); - - StreamConfig cfg = new StreamConfig(new Configuration()); - cfg.setStreamOperator(new LifecycleTrackingStreamSource(new MockSourceFunction())); - cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig); - - task.startTaskThread(); - - // wait for clean termination - task.getExecutingThread().join(); - assertEquals(ExecutionState.FINISHED, task.getExecutionState()); - } - - private static class MockSourceFunction extends RichSourceFunction { - - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext ctx) { - } - - @Override - public void cancel() { - } - - @Override - public void setRuntimeContext(RuntimeContext t) { - System.out.println("!setRuntimeContext"); - super.setRuntimeContext(t); - } - - @Override - public void open(Configuration parameters) throws Exception { - System.out.println("!open"); - super.open(parameters); - } - - @Override - public void close() throws Exception { - System.out.println("!close"); - super.close(); - } - } - - private Task createTask( - Class invokable, - StreamConfig taskConfig, - Configuration taskManagerConfig) throws Exception { - - LibraryCacheManager libCache = mock(LibraryCacheManager.class); - when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); - - ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); - ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); - PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class); - Executor executor = mock(Executor.class); - - NetworkEnvironment network = mock(NetworkEnvironment.class); - when(network.getResultPartitionManager()).thenReturn(partitionManager); - when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); - when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) - .thenReturn(mock(TaskKvStateRegistry.class)); - - TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), - new SerializedValue<>(new ExecutionConfig()), - "Test Task", 1, 0, 1, 0, - new Configuration(), - taskConfig.getConfiguration(), - invokable.getName(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - 0); - - return new Task( - tdd, - mock(MemoryManager.class), - mock(IOManager.class), - network, - mock(BroadcastVariableManager.class), - mock(TaskManagerConnection.class), - mock(InputSplitProvider.class), - mock(CheckpointResponder.class), - libCache, - mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), - new UnregisteredTaskMetricsGroup(), - consumableNotifier, - partitionStateChecker, - executor); - } - - static class LifecycleTrackingStreamSource> - extends StreamSource implements Serializable { - - //private transient final AtomicInteger currentState; - - private static final long serialVersionUID = 2431488948886850562L; - - public LifecycleTrackingStreamSource(SRC sourceFunction) { - super(sourceFunction); - } - - @Override - public void setup(StreamTask containingTask, StreamConfig config, Output> output) { - System.out.println("setup"); - super.setup(containingTask, config, output); - } - - @Override - public void snapshotState(StateSnapshotContext context) throws Exception { - System.out.println("snapshotState"); - super.snapshotState(context); - } - - @Override - public void initializeState(StateInitializationContext context) throws Exception { - System.out.println("initializeState"); - super.initializeState(context); - } - - @Override - public void open() throws Exception { - System.out.println("open"); - super.open(); - } - - @Override - public void close() throws Exception { - System.out.println("close"); - super.close(); - } - - @Override - public void dispose() throws Exception { - super.dispose(); - System.out.println("dispose"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/1e475c76/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 8aae19f..94f6d5a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -200,13 +200,13 @@ public class StreamTaskTest { } } - private Task createTask( + public static Task createTask( Class invokable, StreamConfig taskConfig, Configuration taskManagerConfig) throws Exception { LibraryCacheManager libCache = mock(LibraryCacheManager.class); - when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); + when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader()); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);