Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7A3F6200D24 for ; Tue, 24 Oct 2017 16:14:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 78FDB160BF3; Tue, 24 Oct 2017 14:14:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 81D92160BDB for ; Tue, 24 Oct 2017 16:14:16 +0200 (CEST) Received: (qmail 37828 invoked by uid 500); 24 Oct 2017 14:14:15 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 37809 invoked by uid 99); 24 Oct 2017 14:14:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Oct 2017 14:14:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 17702DF9C2; Tue, 24 Oct 2017 14:14:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Tue, 24 Oct 2017 14:14:13 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/5] flink git commit: [hotfix][tests] Add easier way to chain operator in StreamTaskTestHarness archived-at: Tue, 24 Oct 2017 14:14:17 -0000 [hotfix][tests] Add easier way to chain operator in StreamTaskTestHarness Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5bebef1d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5bebef1d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5bebef1d Branch: refs/heads/master Commit: 5bebef1da4a6dbcbda49ce44e148b1dfc36c273f Parents: 03c1785 Author: Piotr Nowojski Authored: Wed Oct 18 16:01:38 2017 +0200 Committer: Aljoscha Krettek Committed: Tue Oct 24 15:06:43 2017 +0200 ---------------------------------------------------------------------- .../runtime/tasks/OneInputStreamTaskTest.java | 70 +---------- .../runtime/tasks/StreamConfigChainer.java | 118 +++++++++++++++++++ .../runtime/tasks/StreamTaskTestHarness.java | 5 + 3 files changed, 128 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5bebef1d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 8d80d66..3834633 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; @@ -47,7 +46,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -61,7 +59,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -253,71 +250,14 @@ public class OneInputStreamTaskTest extends TestLogger { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); - // ------------------ setup the chain ------------------ - TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator(); - StreamConfig headOperatorConfig = testHarness.getStreamConfig(); - WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator(); - StreamConfig watermarkOperatorConfig = new StreamConfig(new Configuration()); - TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator(); - StreamConfig tailOperatorConfig = new StreamConfig(new Configuration()); - - headOperatorConfig.setStreamOperator(headOperator); - headOperatorConfig.setOperatorID(new OperatorID(42L, 42L)); - headOperatorConfig.setChainStart(); - headOperatorConfig.setChainIndex(0); - headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge( - new StreamNode(null, 0, null, null, null, null, null), - new StreamNode(null, 1, null, null, null, null, null), - 0, - Collections.emptyList(), - null, - null - ))); - - watermarkOperatorConfig.setStreamOperator(watermarkOperator); - watermarkOperatorConfig.setOperatorID(new OperatorID(4711L, 42L)); - watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE); - watermarkOperatorConfig.setChainIndex(1); - watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge( - new StreamNode(null, 1, null, null, null, null, null), - new StreamNode(null, 2, null, null, null, null, null), - 0, - Collections.emptyList(), - null, - null - ))); - - List outEdgesInOrder = new LinkedList(); - outEdgesInOrder.add(new StreamEdge( - new StreamNode(null, 2, null, null, null, null, null), - new StreamNode(null, 3, null, null, null, null, null), - 0, - Collections.emptyList(), - new BroadcastPartitioner(), - null)); - - tailOperatorConfig.setStreamOperator(tailOperator); - tailOperatorConfig.setOperatorID(new OperatorID(123L, 123L)); - tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE); - tailOperatorConfig.setBufferTimeout(0); - tailOperatorConfig.setChainIndex(2); - tailOperatorConfig.setChainEnd(); - tailOperatorConfig.setOutputSelectors(Collections.>emptyList()); - tailOperatorConfig.setNumberOfOutputs(1); - tailOperatorConfig.setOutEdgesInOrder(outEdgesInOrder); - tailOperatorConfig.setNonChainedOutputs(outEdgesInOrder); - tailOperatorConfig.setTypeSerializerOut(StringSerializer.INSTANCE); - - Map chainedConfigs = new HashMap<>(2); - chainedConfigs.put(1, watermarkOperatorConfig); - chainedConfigs.put(2, tailOperatorConfig); - headOperatorConfig.setTransitiveChainedTaskConfigs(chainedConfigs); - headOperatorConfig.setOutEdgesInOrder(outEdgesInOrder); - - // ----------------------------------------------------- + + testHarness.setupOperatorChain(new OperatorID(42L, 42L), headOperator) + .chain(new OperatorID(4711L, 42L), watermarkOperator, StringSerializer.INSTANCE) + .chain(new OperatorID(123L, 123L), tailOperator, StringSerializer.INSTANCE) + .finish(); // --------------------- begin test --------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5bebef1d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java new file mode 100644 index 0000000..74898a4 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Helper class to build StreamConfig for chain of operators. + */ +public class StreamConfigChainer { + private final StreamConfig headConfig; + private final Map chainedConfigs = new HashMap<>(); + + private StreamConfig tailConfig; + private int chainIndex = 0; + + public StreamConfigChainer(OperatorID headOperatorID, StreamOperator headOperator, StreamConfig headConfig) { + this.headConfig = checkNotNull(headConfig); + this.tailConfig = checkNotNull(headConfig); + + head(headOperator, headOperatorID); + } + + private void head(StreamOperator headOperator, OperatorID headOperatorID) { + headConfig.setStreamOperator(headOperator); + headConfig.setOperatorID(headOperatorID); + headConfig.setChainStart(); + headConfig.setChainIndex(chainIndex); + } + + public StreamConfigChainer chain( + OperatorID operatorID, + OneInputStreamOperator operator, + TypeSerializer typeSerializer) { + return chain(operatorID, operator, typeSerializer, typeSerializer); + } + + public StreamConfigChainer chain( + OperatorID operatorID, + OneInputStreamOperator operator, + TypeSerializer inputSerializer, + TypeSerializer outputSerializer) { + chainIndex++; + + tailConfig.setChainedOutputs(Collections.singletonList( + new StreamEdge( + new StreamNode(null, tailConfig.getChainIndex(), null, null, null, null, null), + new StreamNode(null, chainIndex, null, null, null, null, null), + 0, + Collections.emptyList(), + null, + null))); + tailConfig = new StreamConfig(new Configuration()); + tailConfig.setStreamOperator(checkNotNull(operator)); + tailConfig.setOperatorID(checkNotNull(operatorID)); + tailConfig.setTypeSerializerIn1(inputSerializer); + tailConfig.setTypeSerializerOut(outputSerializer); + tailConfig.setChainIndex(chainIndex); + + chainedConfigs.put(chainIndex, tailConfig); + + return this; + } + + public void finish() { + + List outEdgesInOrder = new LinkedList(); + outEdgesInOrder.add( + new StreamEdge( + new StreamNode(null, chainIndex, null, null, null, null, null), + new StreamNode(null, chainIndex , null, null, null, null, null), + 0, + Collections.emptyList(), + new BroadcastPartitioner(), + null)); + + tailConfig.setBufferTimeout(0); + tailConfig.setChainEnd(); + tailConfig.setOutputSelectors(Collections.emptyList()); + tailConfig.setNumberOfOutputs(1); + tailConfig.setOutEdgesInOrder(outEdgesInOrder); + tailConfig.setNonChainedOutputs(outEdgesInOrder); + headConfig.setTransitiveChainedTaskConfigs(chainedConfigs); + headConfig.setOutEdgesInOrder(outEdgesInOrder); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5bebef1d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 19d48e1..5b15477 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; @@ -370,6 +371,10 @@ public class StreamTaskTestHarness { } } + public StreamConfigChainer setupOperatorChain(OperatorID headOperatorId, OneInputStreamOperator headOperator) { + return new StreamConfigChainer(headOperatorId, headOperator, getStreamConfig()); + } + // ------------------------------------------------------------------------ private class TaskThread extends Thread {