Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C487F200D5E for ; Fri, 17 Nov 2017 12:03:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C3142160BFB; Fri, 17 Nov 2017 11:03:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 021FB160C0C for ; Fri, 17 Nov 2017 12:03:20 +0100 (CET) Received: (qmail 19427 invoked by uid 500); 17 Nov 2017 11:03:20 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 19215 invoked by uid 99); 17 Nov 2017 11:03:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Nov 2017 11:03:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5270F5E68; Fri, 17 Nov 2017 11:03:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kkloudas@apache.org To: commits@flink.apache.org Date: Fri, 17 Nov 2017 11:03:23 -0000 Message-Id: <9707e0b56a9c4080bb07bea1b60b8d7b@git.apache.org> In-Reply-To: <12e0b81a84ba42d0aeaa22907a4120a3@git.apache.org> References: <12e0b81a84ba42d0aeaa22907a4120a3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] flink git commit: [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds. archived-at: Fri, 17 Nov 2017 11:03:21 -0000 [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a68d752 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a68d752 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a68d752 Branch: refs/heads/release-1.4 Commit: 1a68d7527932b12bd2cb392c7c7781023756bf0c Parents: 12b0c58 Author: kkloudas Authored: Thu Nov 16 17:45:49 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 11:20:55 2017 +0100 ---------------------------------------------------------------------- .../itcases/AbstractQueryableStateTestBase.java | 32 +++++++++++++++----- .../flink/runtime/jobmanager/JobManager.scala | 4 +-- .../runtime/jobmanager/JobManagerTest.java | 5 +-- 3 files changed, 29 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index a789dbd..65e9bb5 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { /** * Tests that duplicate query registrations fail the job at the JobManager. - * - * NOTE: This test is only in the non-HA variant of the tests because - * in the HA mode we use the actual JM code which does not recognize the - * {@code NotifyWhenJobStatus} message. */ @Test public void testDuplicateRegistrationFailsJob() throws Exception { @@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { /** * Tests that the correct exception is thrown if the query - * contains a wrong queryable state name. + * contains a wrong jobId or wrong queryable state name. */ @Test - public void testWrongQueryableStateName() throws Exception { + public void testWrongJobIdAndWrongQueryableStateName() throws Exception { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); @@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); assertEquals(JobStatus.RUNNING, jobStatus.state()); - CompletableFuture>> future = client.getKvState( + final JobID wrongJobId = new JobID(); + + CompletableFuture>> unknownJobFuture = client.getKvState( + wrongJobId, // this is the wrong job id + "hankuna", + 0, + BasicTypeInfo.INT_TYPE_INFO, + valueState); + + try { + unknownJobFuture.get(); + fail(); // by now the job must have failed. + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof RuntimeException); + Assert.assertTrue(e.getCause().getMessage().contains( + "FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")")); + } catch (Exception ignored) { + fail("Unexpected type of exception."); + } + + CompletableFuture>> unknownQSName = client.getKvState( jobId, "wrong-hankuna", // this is the wrong name. 0, @@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { valueState); try { - future.get(); + unknownQSName.get(); fail(); // by now the job must have failed. } catch (ExecutionException e) { Assert.assertTrue(e.getCause() instanceof RuntimeException); http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4fb1196..f57637a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _} -import org.apache.flink.runtime.messages.{Acknowledge, StackTrace} +import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace} import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry} @@ -1503,7 +1503,7 @@ class JobManager( } case None => - sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found")) + sender() ! Status.Failure(new FlinkJobNotFoundException(msg.getJobId)) } // TaskManager KvState registration http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index a697aae..6a02d1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -63,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse; @@ -672,7 +673,7 @@ public class JobManagerTest extends TestLogger { try { Await.result(lookupFuture, deadline.timeLeft()); fail("Did not throw expected Exception"); - } catch (IllegalStateException ignored) { + } catch (FlinkJobNotFoundException ignored) { // Expected } @@ -735,7 +736,7 @@ public class JobManagerTest extends TestLogger { try { Await.result(lookupFuture, deadline.timeLeft()); fail("Did not throw expected Exception"); - } catch (IllegalStateException ignored) { + } catch (FlinkJobNotFoundException ignored) { // Expected }