Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DBC4C10E2A for ; Thu, 19 Feb 2015 19:53:25 +0000 (UTC) Received: (qmail 42447 invoked by uid 500); 19 Feb 2015 19:53:25 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 42365 invoked by uid 500); 19 Feb 2015 19:53:25 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 42140 invoked by uid 99); 19 Feb 2015 19:53:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Feb 2015 19:53:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6FA7EE0990; Thu, 19 Feb 2015 19:53:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 19 Feb 2015 19:53:33 -0000 Message-Id: In-Reply-To: <04534d449b19436e807441da68397558@git.apache.org> References: <04534d449b19436e807441da68397558@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [9/9] flink git commit: [tests] Cleanup style and timeouts in recovery restart tests [tests] Cleanup style and timeouts in recovery restart tests Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a51c02f6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a51c02f6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a51c02f6 Branch: refs/heads/master Commit: a51c02f6e8be948d71a00c492808115d622379a7 Parents: 665e601 Author: Stephan Ewen Authored: Thu Feb 19 20:31:56 2015 +0100 Committer: Stephan Ewen Committed: Thu Feb 19 20:31:56 2015 +0100 ---------------------------------------------------------------------- .../runtime/jobmanager/RecoveryITCase.scala | 23 +++++++++++---- .../runtime/testingUtils/TestingUtils.scala | 3 +- flink-tests/src/test/resources/log4j.properties | 27 ----------------- .../jobmanager/JobManagerFailsITCase.scala | 24 ++++++++------- .../taskmanager/TaskManagerFailsITCase.scala | 31 +++++++++----------- 5 files changed, 46 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index 2c1f82f..e7d1d83 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -21,13 +21,13 @@ package org.apache.flink.runtime.jobmanager import akka.actor.Status.Success import akka.actor.{ActorRef, PoisonPill, ActorSystem} import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, -AbstractJobVertex} +import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern,AbstractJobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.TestingUtils +import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils} import org.junit.runner.RunWith import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import org.scalatest.junit.JUnitRunner @@ -35,12 +35,23 @@ import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class RecoveryITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { + def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } + def startTestClusterWithHeartbeatTimeout(numSlots: Int, + numTaskManagers: Int, + heartbeatTimeout: String): TestingCluster = { + val config = new Configuration() + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers) + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout) + new TestingCluster(config) + } + val NUM_TASKS = 31 "The recovery" must { @@ -61,7 +72,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { val jobGraph = new JobGraph("Pointwise job", sender, receiver) jobGraph.setNumberOfExecutionRetries(1) - val cluster = TestingUtils.startTestingCluster(2 * NUM_TASKS) + val cluster = startTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 s") val jm = cluster.getJobManager try { @@ -104,7 +115,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { val jobGraph = new JobGraph("Pointwise job", sender, receiver) jobGraph.setNumberOfExecutionRetries(1) - val cluster = TestingUtils.startTestingCluster(NUM_TASKS) + val cluster = startTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s") val jm = cluster.getJobManager try { @@ -147,7 +158,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { val jobGraph = new JobGraph("Pointwise job", sender, receiver) jobGraph.setNumberOfExecutionRetries(1) - val cluster = TestingUtils.startTestingCluster(NUM_TASKS, 2) + val cluster = startTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s") val jm = cluster.getJobManager http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 72ed9e7..147cc8a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -96,8 +96,7 @@ object TestingUtils { } def startTestingCluster(numSlots: Int, numTMs: Int = 1, - timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): - TestingCluster = { + timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs) http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/log4j.properties b/flink-tests/src/test/resources/log4j.properties deleted file mode 100644 index 6bf344a..0000000 --- a/flink-tests/src/test/resources/log4j.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# This file ensures that tests executed from the IDE show log output - -log4j.rootLogger=INFO, console - -# Log all infos in the given file -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target = System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index ac1864c..416470f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -48,16 +48,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { "A TaskManager" should { "detect a lost connection to the JobManager and try to reconnect to it" in { - val num_slots = 11 - - val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, num_slots) - config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms") - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms") - config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5) - val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false) + val num_slots = 11 + val cluster = startDeathwatchCluster(num_slots, 1) val tm = cluster.getTaskManagers(0) val jm = cluster.getJobManager @@ -100,7 +93,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { noOp.setInvokableClass(classOf[NoOpInvokable]) val jobGraph2 = new JobGraph("NoOp Testjob", noOp) - val cluster = ForkableFlinkMiniCluster.startClusterDeathWatch(num_slots / 2, 2) + val cluster = startDeathwatchCluster(num_slots / 2, 2) var jm = cluster.getJobManager val tm = cluster.getTaskManagers(0) @@ -135,4 +128,15 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { } } } + + def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = { + val config = new Configuration() + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers) + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms") + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms") + config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5) + + new ForkableFlinkMiniCluster(config, singleActorSystem = false) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index c81ec88..245bcd9 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -52,16 +52,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { "The JobManager" should { "detect a failing task manager" in { - val num_slots = 11 - - val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, num_slots) - config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2) - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms") - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms") - config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5) - val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false) + val num_slots = 11 + val cluster = startDeathwatchCluster(num_slots, 2) val taskManagers = cluster.getTaskManagers val jm = cluster.getJobManager @@ -89,6 +82,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { } "handle gracefully failing task manager" in { + val num_tasks = 31 val sender = new AbstractJobVertex("Sender") val receiver = new AbstractJobVertex("Receiver") @@ -190,14 +184,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { noOp.setInvokableClass(classOf[NoOpInvokable]) val jobGraph2 = new JobGraph("NoOp Testjob", noOp) - val config = new Configuration() - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, num_slots/2) - config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2) - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms") - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms") - config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5) - - val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false) + val cluster = startDeathwatchCluster(num_slots/2, 2) var tm = cluster.getTaskManagers(0) val jm = cluster.getJobManager @@ -239,4 +226,14 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { } } + def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = { + val config = new Configuration() + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers) + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms") + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms") + config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5) + + new ForkableFlinkMiniCluster(config, singleActorSystem = false) + } }