Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 97DBF200B72 for ; Fri, 26 Aug 2016 15:30:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9670B160AB6; Fri, 26 Aug 2016 13:30:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B521C160AB0 for ; Fri, 26 Aug 2016 15:30:18 +0200 (CEST) Received: (qmail 51346 invoked by uid 500); 26 Aug 2016 13:30:17 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 51337 invoked by uid 99); 26 Aug 2016 13:30:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Aug 2016 13:30:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5EEFE020A; Fri, 26 Aug 2016 13:30:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxm@apache.org To: commits@flink.apache.org Message-Id: <933b7a8885634fb5bbcdc8f29f0006da@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-4273] adapt JobRetrievalITCase to lazy classloader reconstruction Date: Fri, 26 Aug 2016 13:30:17 +0000 (UTC) archived-at: Fri, 26 Aug 2016 13:30:19 -0000 Repository: flink Updated Branches: refs/heads/master abb449678 -> b05ea6939 [FLINK-4273] adapt JobRetrievalITCase to lazy classloader reconstruction Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b05ea693 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b05ea693 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b05ea693 Branch: refs/heads/master Commit: b05ea693984a5f5bf2e53f89d9fbd531e7be83fd Parents: abb4496 Author: Maximilian Michels Authored: Wed Aug 24 10:11:45 2016 +0200 Committer: Maximilian Michels Committed: Fri Aug 26 15:29:46 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/client/JobAttachmentClientActor.java | 3 ++- .../org/apache/flink/runtime/jobmanager/JobManager.scala | 2 +- .../runtime/testingUtils/TestingJobManagerLike.scala | 7 +++++-- .../runtime/testingUtils/TestingJobManagerMessages.scala | 11 +++++++++++ .../flink/test/clients/examples/JobRetrievalITCase.java | 8 +++++--- 5 files changed, 24 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java index 5446002..ffab9cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java @@ -130,7 +130,8 @@ public class JobAttachmentClientActor extends JobClientActor { } private void tryToAttachToJob() { - LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress", jobID); + LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress", + jobManager, jobID); Futures.future(new Callable() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d35fb0a..0e28d98 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -478,7 +478,7 @@ class JobManager( val client = sender() currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => - log.info("Registering client for job $jobID") + log.info(s"Registering client for job $jobID") jobInfo.clients += ((client, listeningBehaviour)) val listener = new StatusListenerMessenger(client, leaderSessionID.orNull) executionGraph.registerJobStatusListener(listener) http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index df4f95a..6a9b490 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged -import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient} +import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps} import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat @@ -336,7 +336,10 @@ trait TestingJobManagerLike extends FlinkActor { case msg: RegisterJobClient => super.handleMessage(msg) - waitForClient.foreach(_ ! true) + waitForClient.foreach(_ ! ClientConnected) + case msg: RequestClassloadingProps => + super.handleMessage(msg) + waitForClient.foreach(_ ! ClassLoadingPropsDelivered) case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) => if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) { http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index a88ed43..f121305 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -86,6 +86,14 @@ object TestingJobManagerMessages { * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs */ case object NotifyWhenClientConnects + /** + * Notifes of client connect + */ + case object ClientConnected + /** + * Notifies when the client has requested class loading information + */ + case object ClassLoadingPropsDelivered /** * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]] @@ -119,4 +127,7 @@ object TestingJobManagerMessages { def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects def getDisablePostStop(): AnyRef = DisablePostStop + def getClientConnected(): AnyRef = ClientConnected + def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered + } http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java index db17ee8..c9059f1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.runtime.client.JobRetrievalException; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -86,7 +85,7 @@ public class JobRetrievalITCase extends TestLogger { public void run() { try { assertNotNull(client.retrieveJob(jobID)); - } catch (JobExecutionException e) { + } catch (Throwable e) { fail(e.getMessage()); } } @@ -106,7 +105,10 @@ public class JobRetrievalITCase extends TestLogger { resumingThread.start(); // wait for client to connect - testkit.expectMsgEquals(true); + testkit.expectMsgAllOf( + TestingJobManagerMessages.getClientConnected(), + TestingJobManagerMessages.getClassLoadingPropsDelivered()); + // client has connected, we can release the lock lock.release();