flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-6134] Set UUID(0L, 0L) as default leader session id
Date Wed, 22 Mar 2017 11:14:24 GMT
Repository: flink
Updated Branches:
  refs/heads/master 034d9a3ab -> 2dfd463e2


http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 236e922..e4d0f65 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -41,12 +41,10 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
@@ -70,7 +68,6 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -168,91 +165,6 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	}
 
 	/**
-	 * Tests that submissions to non-leaders are handled.
-	 */
-	@Test
-	public void testSubmitJobToNonLeader() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
-
-		// Configure the cluster
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-
-		TestingCluster flink = new TestingCluster(config, false, false);
-
-		try {
-			final Deadline deadline = TestTimeOut.fromNow();
-
-			// Start the JobManager and TaskManager
-			flink.start(true);
-
-			JobGraph jobGraph = createBlockingJobGraph();
-
-			List<ActorRef> bothJobManagers = flink.getJobManagersAsJava();
-
-			ActorGateway leadingJobManager = flink.getLeaderGateway(deadline.timeLeft());
-
-			ActorGateway nonLeadingJobManager;
-			if (bothJobManagers.get(0).equals(leadingJobManager.actor())) {
-				nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(1), null);
-			}
-			else {
-				nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(0), null);
-			}
-
-			log.info("Leading job manager: " + leadingJobManager);
-			log.info("Non-leading job manager: " + nonLeadingJobManager);
-
-			// Submit the job
-			nonLeadingJobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
-
-			log.info("Submitted job graph to " + nonLeadingJobManager);
-
-			// Wait for the job to start. We are asking the *leading** JM here although we've
-			// submitted the job to the non-leading JM. This is the behaviour under test.
-			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
-					leadingJobManager, deadline.timeLeft());
-
-			log.info("Wait that the non-leader removes the submitted job.");
-
-			// Make sure that the **non-leading** JM has actually removed the job graph from its
-			// local state.
-			boolean success = false;
-			while (!success && deadline.hasTimeLeft()) {
-				JobStatusResponse jobStatusResponse = JobManagerActorTestUtils.requestJobStatus(
-						jobGraph.getJobID(), nonLeadingJobManager, deadline.timeLeft());
-
-				if (jobStatusResponse instanceof JobManagerMessages.JobNotFound) {
-					success = true;
-				}
-				else {
-					log.info(((JobManagerMessages.CurrentJobStatus)jobStatusResponse).status().toString());
-					Thread.sleep(100);
-				}
-			}
-
-			if (!success) {
-				fail("Non-leading JM was still holding reference to the job graph.");
-			}
-
-			Future<Object> jobRemoved = leadingJobManager.ask(
-				new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
-				deadline.timeLeft());
-
-			leadingJobManager.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
-
-			Await.ready(jobRemoved, deadline.timeLeft());
-		}
-		finally {
-			flink.shutdown();
-		}
-
-		// Verify that everything is clean
-		verifyCleanRecoveryState(config);
-	}
-
-	/**
 	 * Tests that clients receive updates after recovery by a new leader.
 	 */
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 440cfff..e38fab4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -24,6 +24,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -75,7 +76,7 @@ public class LocalFlinkMiniClusterITCase {
 			final ActorGateway jmGateway = miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
 			new JavaTestKit(system) {{
-				final ActorGateway selfGateway = new AkkaActorGateway(getRef(), null);
+				final ActorGateway selfGateway = new AkkaActorGateway(getRef(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 				new Within(TestingUtils.TESTING_DURATION()) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 8534ba8..a09c5b2 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -93,7 +93,9 @@ public class UtilsTest extends TestLogger {
 
 			Configuration flinkConfig = new Configuration();
 			YarnConfiguration yarnConfig = new YarnConfiguration();
-			TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+			TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+				null,
+				null);
 			String applicationMasterHostName = "localhost";
 			String webInterfaceURL = "foobar";
 			ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(


Mime
View raw message