Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A4A05200C3F for ; Wed, 22 Mar 2017 12:14:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A06EC160B91; Wed, 22 Mar 2017 11:14:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C12C4160B83 for ; Wed, 22 Mar 2017 12:14:25 +0100 (CET) Received: (qmail 2627 invoked by uid 500); 22 Mar 2017 11:14:24 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 2616 invoked by uid 99); 22 Mar 2017 11:14:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Mar 2017 11:14:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 85B92DFC15; Wed, 22 Mar 2017 11:14:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Wed, 22 Mar 2017 11:14:24 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-6134] Set UUID(0L, 0L) as default leader session id archived-at: Wed, 22 Mar 2017 11:14:26 -0000 Repository: flink Updated Branches: refs/heads/master 034d9a3ab -> 2dfd463e2 http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index 236e922..e4d0f65 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -41,12 +41,10 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; @@ -70,7 +68,6 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.IOException; import java.util.Collection; -import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -168,91 +165,6 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { } /** - * Tests that submissions to non-leaders are handled. - */ - @Test - public void testSubmitJobToNonLeader() throws Exception { - Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( - ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); - - // Configure the cluster - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - - TestingCluster flink = new TestingCluster(config, false, false); - - try { - final Deadline deadline = TestTimeOut.fromNow(); - - // Start the JobManager and TaskManager - flink.start(true); - - JobGraph jobGraph = createBlockingJobGraph(); - - List bothJobManagers = flink.getJobManagersAsJava(); - - ActorGateway leadingJobManager = flink.getLeaderGateway(deadline.timeLeft()); - - ActorGateway nonLeadingJobManager; - if (bothJobManagers.get(0).equals(leadingJobManager.actor())) { - nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(1), null); - } - else { - nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(0), null); - } - - log.info("Leading job manager: " + leadingJobManager); - log.info("Non-leading job manager: " + nonLeadingJobManager); - - // Submit the job - nonLeadingJobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED)); - - log.info("Submitted job graph to " + nonLeadingJobManager); - - // Wait for the job to start. We are asking the *leading** JM here although we've - // submitted the job to the non-leading JM. This is the behaviour under test. - JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, - leadingJobManager, deadline.timeLeft()); - - log.info("Wait that the non-leader removes the submitted job."); - - // Make sure that the **non-leading** JM has actually removed the job graph from its - // local state. - boolean success = false; - while (!success && deadline.hasTimeLeft()) { - JobStatusResponse jobStatusResponse = JobManagerActorTestUtils.requestJobStatus( - jobGraph.getJobID(), nonLeadingJobManager, deadline.timeLeft()); - - if (jobStatusResponse instanceof JobManagerMessages.JobNotFound) { - success = true; - } - else { - log.info(((JobManagerMessages.CurrentJobStatus)jobStatusResponse).status().toString()); - Thread.sleep(100); - } - } - - if (!success) { - fail("Non-leading JM was still holding reference to the job graph."); - } - - Future jobRemoved = leadingJobManager.ask( - new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), - deadline.timeLeft()); - - leadingJobManager.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); - - Await.ready(jobRemoved, deadline.timeLeft()); - } - finally { - flink.shutdown(); - } - - // Verify that everything is clean - verifyCleanRecoveryState(config); - } - - /** * Tests that clients receive updates after recovery by a new leader. */ @Test http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java index 440cfff..e38fab4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java @@ -24,6 +24,7 @@ import akka.testkit.JavaTestKit; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -75,7 +76,7 @@ public class LocalFlinkMiniClusterITCase { final ActorGateway jmGateway = miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); new JavaTestKit(system) {{ - final ActorGateway selfGateway = new AkkaActorGateway(getRef(), null); + final ActorGateway selfGateway = new AkkaActorGateway(getRef(), HighAvailabilityServices.DEFAULT_LEADER_ID); new Within(TestingUtils.TESTING_DURATION()) { http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java index 8534ba8..a09c5b2 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -93,7 +93,9 @@ public class UtilsTest extends TestLogger { Configuration flinkConfig = new Configuration(); YarnConfiguration yarnConfig = new YarnConfiguration(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); String applicationMasterHostName = "localhost"; String webInterfaceURL = "foobar"; ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(