flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] flink git commit: [FLINK-4273] Modify JobClient to attach to running jobs
Date Thu, 25 Aug 2016 13:59:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master 444315a12 -> 259a3a556


http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 9640fcd..df4f95a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
+import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient}
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -67,6 +67,8 @@ trait TestingJobManagerLike extends FlinkActor {
       override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
     })
 
+  val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
+
   val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
 
   var disconnectDisabled = false
@@ -328,6 +330,14 @@ trait TestingJobManagerLike extends FlinkActor {
 
       waitForLeader.clear()
 
+    case NotifyWhenClientConnects =>
+      waitForClient += sender()
+      sender() ! true
+
+    case msg: RegisterJobClient =>
+      super.handleMessage(msg)
+      waitForClient.foreach(_ ! true)
+
     case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
       if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager)
{
         // there are already at least numRegisteredTaskManager registered --> send Acknowledge

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index a411c8b..a88ed43 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -83,6 +83,11 @@ object TestingJobManagerMessages {
   case object NotifyWhenLeader
 
   /**
+    * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs
+    */
+  case object NotifyWhenClientConnects
+
+  /**
    * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
    * message when at least numRegisteredTaskManager have registered at the JobManager.
    *
@@ -111,6 +116,7 @@ object TestingJobManagerMessages {
   case class ResponseSavepoint(savepoint: Savepoint)
 
   def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
+  def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
   def getDisablePostStop(): AnyRef = DisablePostStop
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 073164c0..2adf7eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -25,17 +25,17 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobClientMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import scala.concurrent.Await;
@@ -45,6 +45,8 @@ import scala.concurrent.duration.FiniteDuration;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.runtime.messages.JobManagerMessages.*;
+
 public class JobClientActorTest extends TestLogger {
 
 	private static ActorSystem system;
@@ -62,8 +64,8 @@ public class JobClientActorTest extends TestLogger {
 	}
 
 	/** Tests that a {@link JobClientActorSubmissionTimeoutException} is thrown when the job
cannot
-	 * be submitted by the JobClientActor. This is here the case, because the started JobManager
-	 * never replies to a SubmitJob message.
+	 * be submitted by the JobSubmissionClientActor. This is here the case, because the started
JobManager
+	 * never replies to a {@link SubmitJob} message.
 	 *
 	 * @throws Exception
 	 */
@@ -84,7 +86,7 @@ public class JobClientActorTest extends TestLogger {
 			leaderSessionID
 		);
 
-		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
+		Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
 			testingLeaderRetrievalService,
 			jobClientActorTimeout,
 			false);
@@ -100,19 +102,56 @@ public class JobClientActorTest extends TestLogger {
 		Await.result(jobExecutionResult, timeout);
 	}
 
+
+	/** Tests that a {@link JobClientActorRegistrationTimeoutException} is thrown when the registration
+	 * cannot be performed at the JobManager by the JobAttachmentClientActor. This is here the
case, because the
+	 * started JobManager never replies to a {@link RegisterJobClient} message.
+	 */
+	@Test(expected=JobClientActorRegistrationTimeoutException.class)
+	public void testRegistrationTimeout() throws Exception {
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+		UUID leaderSessionID = UUID.randomUUID();
+
+		ActorRef jobManager = system.actorOf(
+			Props.create(
+				PlainActor.class,
+				leaderSessionID));
+
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+			jobManager.path().toString(),
+			leaderSessionID
+		);
+
+		Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
+			testingLeaderRetrievalService,
+			jobClientActorTimeout,
+			false);
+
+		ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+		Future<Object> jobExecutionResult = Patterns.ask(
+			jobClientActor,
+			new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()),
+			new Timeout(timeout));
+
+		Await.result(jobExecutionResult, timeout);
+	}
+
 	/** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
-	 * is thrown when the JobClientActor wants to submit a job but has not connected to a JobManager.
+	 * is thrown when the JobSubmissionClientActor wants to submit a job but has not connected
to a JobManager.
 	 *
 	 * @throws Exception
 	 */
 	@Test(expected=JobClientActorConnectionTimeoutException.class)
-	public void testConnectionTimeoutWithoutJobManager() throws Exception {
+	public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Exception {
 		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
 		FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
 		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
 
-		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
+		Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
 			testingLeaderRetrievalService,
 			jobClientActorTimeout,
 			false);
@@ -128,6 +167,32 @@ public class JobClientActorTest extends TestLogger {
 	}
 
 	/** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
+	 * is thrown when the JobAttachmentClientActor attach to a job at the JobManager
+	 * but has not connected to a JobManager.
+	 */
+	@Test(expected=JobClientActorConnectionTimeoutException.class)
+	public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Exception {
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+
+		Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
+			testingLeaderRetrievalService,
+			jobClientActorTimeout,
+			false);
+
+		ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+		Future<Object> jobExecutionResult = Patterns.ask(
+			jobClientActor,
+			new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()),
+			new Timeout(timeout));
+
+		Await.result(jobExecutionResult, timeout);
+	}
+
+	/** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
 	 * is thrown after a successful job submission if the JobManager dies.
 	 *
 	 * @throws Exception
@@ -149,7 +214,7 @@ public class JobClientActorTest extends TestLogger {
 			leaderSessionID
 		);
 
-		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
+		Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
 			testingLeaderRetrievalService,
 			jobClientActorTimeout,
 			false);
@@ -170,6 +235,91 @@ public class JobClientActorTest extends TestLogger {
 		Await.result(jobExecutionResult, timeout);
 	}
 
+	/** Tests that a {@link JobClientActorConnectionTimeoutException}
+	 * is thrown after a successful registration of the client at the JobManager.
+	 */
+	@Test(expected=JobClientActorConnectionTimeoutException.class)
+	public void testConnectionTimeoutAfterJobRegistration() throws Exception {
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+		UUID leaderSessionID = UUID.randomUUID();
+
+		ActorRef jobManager = system.actorOf(
+			Props.create(
+				JobAcceptingActor.class,
+				leaderSessionID));
+
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+			jobManager.path().toString(),
+			leaderSessionID
+		);
+
+		Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
+			testingLeaderRetrievalService,
+			jobClientActorTimeout,
+			false);
+
+		ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+		Future<Object> jobExecutionResult = Patterns.ask(
+			jobClientActor,
+			new AttachToJobAndWait(testJobGraph.getJobID()),
+			new Timeout(timeout));
+
+		Future<Object> waitFuture = Patterns.ask(jobManager, new RegisterTest(), new Timeout(timeout));
+
+		Await.result(waitFuture, timeout);
+
+		jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+		Await.result(jobExecutionResult, timeout);
+	}
+
+
+	/** Tests that JobClient throws an Exception if the JobClientActor dies and can't answer
to
+	 * {@link akka.actor.Identify} message anymore.
+	 */
+	@Test
+	public void testGuaranteedAnswerIfJobClientDies() throws Exception {
+		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
+
+			UUID leaderSessionID = UUID.randomUUID();
+
+		ActorRef jobManager = system.actorOf(
+			Props.create(
+				JobAcceptingActor.class,
+				leaderSessionID));
+
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+			jobManager.path().toString(),
+			leaderSessionID
+		);
+
+		JobListeningContext jobListeningContext =
+			JobClient.submitJob(
+				system,
+				testingLeaderRetrievalService,
+				testJobGraph,
+				timeout,
+				false,
+				getClass().getClassLoader());
+
+		Future<Object> waitFuture = Patterns.ask(jobManager, new RegisterTest(), new Timeout(timeout));
+		Await.result(waitFuture, timeout);
+
+		// kill the job client actor which has been registered at the JobManager
+		jobListeningContext.getJobClientActor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+		try {
+			// should not block but return an error
+			JobClient.awaitJobResult(jobListeningContext);
+			Assert.fail();
+		} catch (JobExecutionException e) {
+			// this is what we want
+		}
+	}
+
 	public static class PlainActor extends FlinkUntypedActor {
 
 		private final UUID leaderSessionID;
@@ -180,7 +330,6 @@ public class JobClientActorTest extends TestLogger {
 
 		@Override
 		protected void handleMessage(Object message) throws Exception {
-
 		}
 
 		@Override
@@ -200,17 +349,29 @@ public class JobClientActorTest extends TestLogger {
 
 		@Override
 		protected void handleMessage(Object message) throws Exception {
-			if (message instanceof JobManagerMessages.SubmitJob) {
+			if (message instanceof SubmitJob) {
 				getSender().tell(
-					new JobManagerMessages.JobSubmitSuccess(((JobManagerMessages.SubmitJob) message).jobGraph().getJobID()),
+					new JobSubmitSuccess(((SubmitJob) message).jobGraph().getJobID()),
 					getSelf());
 
 				jobAccepted = true;
 
-				if(testFuture != ActorRef.noSender()) {
+				if (testFuture != ActorRef.noSender()) {
 					testFuture.tell(Messages.getAcknowledge(), getSelf());
 				}
-			} else if (message instanceof RegisterTest) {
+			}
+			else if (message instanceof RegisterJobClient) {
+				getSender().tell(
+					new RegisterJobClientSuccess(((RegisterJobClient) message).jobID()),
+					getSelf());
+
+				jobAccepted = true;
+
+				if (testFuture != ActorRef.noSender()) {
+					testFuture.tell(Messages.getAcknowledge(), getSelf());
+				}
+			}
+			else if (message instanceof RegisterTest) {
 				testFuture = getSender();
 
 				if (jobAccepted) {
@@ -226,4 +387,5 @@ public class JobClientActorTest extends TestLogger {
 	}
 
 	public static class RegisterTest{}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c71bd35..426dfba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -286,7 +286,6 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger
{
 		JobInfo expectedJobInfo = expected.getJobInfo();
 		JobInfo actualJobInfo = actual.getJobInfo();
 
-		assertEquals(expectedJobInfo.listeningBehaviour(), actualJobInfo.listeningBehaviour());
-		assertEquals(expectedJobInfo.start(), actualJobInfo.start());
+		assertEquals(expectedJobInfo, actualJobInfo);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
new file mode 100644
index 0000000..db17ee8
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.clients.examples;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.collection.Seq;
+
+import java.util.concurrent.Semaphore;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Tests retrieval of a job from a running Flink cluster
+ */
+public class JobRetrievalITCase extends TestLogger {
+
+	private static final Semaphore lock = new Semaphore(1);
+
+	private static FlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void before() {
+		cluster = new ForkableFlinkMiniCluster(new Configuration(), false);
+		cluster.start();
+	}
+
+	@AfterClass
+	public static void after() {
+		cluster.stop();
+		cluster = null;
+	}
+
+	@Test
+	public void testJobRetrieval() throws Exception {
+		final JobID jobID = new JobID();
+
+		final JobVertex imalock = new JobVertex("imalock");
+		imalock.setInvokableClass(SemaphoreInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
+
+		final ClusterClient client = new StandaloneClusterClient(cluster.configuration());
+
+		// acquire the lock to make sure that the job cannot complete until the job client
+		// has been attached in resumingThread
+		lock.acquire();
+		client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader());
+
+		final Thread resumingThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					assertNotNull(client.retrieveJob(jobID));
+				} catch (JobExecutionException e) {
+					fail(e.getMessage());
+				}
+			}
+		});
+
+		final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
+		final ActorSystem actorSystem = actorSystemSeq.last();
+		JavaTestKit testkit = new JavaTestKit(actorSystem);
+
+		final ActorRef jm = cluster.getJobManagersAsJava().get(0);
+		// wait until client connects
+		jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
+		// confirm registration
+		testkit.expectMsgEquals(true);
+
+		// kick off resuming
+		resumingThread.start();
+
+		// wait for client to connect
+		testkit.expectMsgEquals(true);
+		// client has connected, we can release the lock
+		lock.release();
+
+		resumingThread.join();
+	}
+
+	@Test
+	public void testNonExistingJobRetrieval() throws Exception {
+		final JobID jobID = new JobID();
+		ClusterClient client = new StandaloneClusterClient(cluster.configuration());
+
+		try {
+			client.retrieveJob(jobID);
+			fail();
+		} catch (JobRetrievalException e) {
+			// this is what we want
+		}
+	}
+
+
+	public static class SemaphoreInvokable extends AbstractInvokable {
+
+		@Override
+		public void invoke() throws Exception {
+			lock.acquire();
+		}
+	}
+
+}


Mime
View raw message