flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8704) Migrate tests from TestingCluster to MiniClusterResource
Date Wed, 28 Mar 2018 15:43:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417581#comment-16417581
] 

ASF GitHub Bot commented on FLINK-8704:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5697#discussion_r177795751
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager;
    +
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
    +import org.apache.flink.runtime.testingUtils.TestingCluster;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    +
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
    +
    +public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
    +
    +	private static final int NUMBER_OF_TMS = 2;
    +	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    +	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
    +
    +	private static TestingCluster flink;
    +
    +	@BeforeClass
    +	public static void setUp() throws Exception {
    +		flink = TestingUtils.startTestingCluster(
    +				NUMBER_OF_SLOTS_PER_TM,
    +				NUMBER_OF_TMS,
    +				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		flink.stop();
    +	}
    +
    +	/**
    +	 * Tests notifications of multiple receivers when a task produces both a pipelined and
blocking
    +	 * result.
    +	 *
    +	 * <pre>
    +	 *                             +----------+
    +	 *            +-- pipelined -> | Receiver |
    +	 * +--------+ |                +----------+
    +	 * | Sender |-|
    +	 * +--------+ |                +----------+
    +	 *            +-- blocking --> | Receiver |
    +	 *                             +----------+
    +	 * </pre>
    +	 *
    +	 * The pipelined receiver gets deployed after the first buffer is available and the
blocking
    +	 * one after all subtasks are finished.
    +	 */
    +	@Test
    +	public void testMixedPipelinedAndBlockingResults() throws Exception {
    +		final JobVertex sender = new JobVertex("Sender");
    +		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
    +		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY,
PARALLELISM);
    +		sender.setParallelism(PARALLELISM);
    +
    +		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
    +		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		pipelinedReceiver.setParallelism(PARALLELISM);
    +
    +		pipelinedReceiver.connectNewDataSetAsInput(
    +				sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.PIPELINED);
    +
    +		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
    +		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		blockingReceiver.setParallelism(PARALLELISM);
    +
    +		blockingReceiver.connectNewDataSetAsInput(sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.BLOCKING);
    +
    +		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
    +				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
    +
    +		sender.setSlotSharingGroup(slotSharingGroup);
    +		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
    +		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
    +
    +		final JobGraph jobGraph = new JobGraph(
    +				"Mixed pipelined and blocking result",
    +				sender,
    +				pipelinedReceiver,
    +				blockingReceiver);
    --- End diff --
    
    Could we deduplicate this code?


> Migrate tests from TestingCluster to MiniClusterResource
> --------------------------------------------------------
>
>                 Key: FLINK-8704
>                 URL: https://issues.apache.org/jira/browse/FLINK-8704
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>            Reporter: Aljoscha Krettek
>            Assignee: Chesnay Schepler
>            Priority: Blocker
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message