Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4170F200C1D for ; Thu, 16 Feb 2017 17:11:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4021C160B57; Thu, 16 Feb 2017 16:11:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3FBD9160B61 for ; Thu, 16 Feb 2017 17:11:58 +0100 (CET) Received: (qmail 25666 invoked by uid 500); 16 Feb 2017 16:11:57 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 25613 invoked by uid 99); 16 Feb 2017 16:11:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2017 16:11:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 43107DFC63; Thu, 16 Feb 2017 16:11:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Message-Id: <272c1fb1e0d54a1f826e23325fb76ee4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport Date: Thu, 16 Feb 2017 16:11:57 +0000 (UTC) archived-at: Thu, 16 Feb 2017 16:11:59 -0000 Repository: flink Updated Branches: refs/heads/release-1.2 d3f2fe262 -> a2853ec15 [FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure to be recognized. Internally the trait will unwrap the failure and wrap it in a scala.util.Failure instance. However, it does not recognize the scala Failure when given to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a scala.util.Success instance. This closes #3324. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2853ec1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2853ec1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2853ec1 Branch: refs/heads/release-1.2 Commit: a2853ec1527dd848d635d9cb7720b2bd21c7e3aa Parents: d3f2fe2 Author: Till Rohrmann Authored: Wed Feb 15 14:16:26 2017 +0100 Committer: Till Rohrmann Committed: Thu Feb 16 17:11:32 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/taskmanager/TaskManager.scala | 21 +- .../runtime/taskmanager/TaskManagerTest.java | 237 ++++++++++++++++++- 2 files changed, 241 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a2853ec1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index f1e74cd..37b5e04 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -70,13 +70,12 @@ import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.util._ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} -import org.apache.flink.util.{MathUtils, NetUtils} +import org.apache.flink.util.MathUtils import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.{Failure, Success, Try} /** * The TaskManager is responsible for executing the individual tasks of a Flink job. It is @@ -429,7 +428,7 @@ class TaskManager( futureResponse.mapTo[Boolean].onComplete { // IMPORTANT: In the future callback, we cannot directly modify state // but only send messages to the TaskManager to do those changes - case Success(result) => + case scala.util.Success(result) => if (!result) { self ! decorateMessage( FailTask( @@ -438,7 +437,7 @@ class TaskManager( ) } - case Failure(t) => + case scala.util.Failure(t) => self ! decorateMessage( FailTask( executionID, @@ -476,7 +475,7 @@ class TaskManager( sender ! decorateMessage(Acknowledge.get()) } catch { case t: Throwable => - sender ! decorateMessage(Failure(t)) + sender ! decorateMessage(Status.Failure(t)) } } else { log.debug(s"Cannot find task to stop for execution ${executionID})") @@ -760,8 +759,6 @@ class TaskManager( // ---- Done ---- log.debug(s"Done with stack trace sample $sampleId.") - - sender ! new StackTraceSampleResponse(sampleId, executionId, currentTraces) } @@ -779,7 +776,7 @@ class TaskManager( } } catch { case e: Exception => - sender ! Failure(e) + sender ! decorateMessage(Status.Failure(e)) } case _ => unhandled(message) @@ -839,10 +836,10 @@ class TaskManager( client.put(fis); }(context.dispatcher) .onComplete { - case Success(value) => + case scala.util.Success(value) => sender ! value fis.close() - case Failure(e) => + case scala.util.Failure(e) => sender ! akka.actor.Status.Failure(e) fis.close() }(context.dispatcher) @@ -1203,7 +1200,7 @@ class TaskManager( catch { case t: Throwable => log.error("SubmitTask failed", t) - sender ! decorateMessage(Failure(t)) + sender ! decorateMessage(Status.Failure(t)) } } @@ -1257,7 +1254,7 @@ class TaskManager( if (errors.isEmpty) { sender ! decorateMessage(Acknowledge.get()) } else { - sender ! decorateMessage(Failure(new Exception(errors.mkString("\n")))) + sender ! decorateMessage(Status.Failure(new Exception(errors.mkString("\n")))) } case None => http://git-wip-us.apache.org/repos/asf/flink/blob/a2853ec1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 2fb5fa8..ef9e4bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -481,7 +481,7 @@ public class TaskManagerTest extends TestLogger { expectMsgEquals(Acknowledge.get()); tm.tell(new StopTask(eid2), testActorGateway); - expectMsgClass(Failure.class); + expectMsgClass(Status.Failure.class); assertEquals(ExecutionState.RUNNING, t2.getExecutionState()); @@ -1225,13 +1225,13 @@ public class TaskManagerTest extends TestLogger { // Receive the expected message (heartbeat races possible) Object[] msg = receiveN(1); - while (!(msg[0] instanceof Failure)) { + while (!(msg[0] instanceof Status.Failure)) { msg = receiveN(1); } - Failure response = (Failure) msg[0]; + Status.Failure response = (Status.Failure) msg[0]; - assertEquals(IllegalStateException.class, response.exception().getClass()); + assertEquals(IllegalStateException.class, response.cause().getClass()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -1523,7 +1523,234 @@ public class TaskManagerTest extends TestLogger { } }}; } - + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the submit task + * message fails. + */ + @Test + public void testSubmitTaskFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor( + new JobID(), + "test job", + new JobVertexID(), + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "test task", + 0, // this will make the submission fail because the number of key groups must be >= 1 + 0, + 1, + 0, + new Configuration(), + new Configuration(), + "Foobar", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + + Future submitResponse = taskManager.ask(new SubmitTask(tdd), timeout); + + try { + Await.result(submitResponse, timeout); + + fail("The submit task message should have failed."); + } catch (IllegalArgumentException e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the stop task + * message fails. + */ + @Test + public void testStopTaskFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor( + new JobID(), + "test job", + new JobVertexID(), + executionAttemptId, + new SerializedValue<>(new ExecutionConfig()), + "test task", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + Tasks.BlockingNoOpInvokable.class.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + + Future submitResponse = taskManager.ask(new SubmitTask(tdd), timeout); + + Await.result(submitResponse, timeout); + + Future stopResponse = taskManager.ask(new StopTask(executionAttemptId), timeout); + + try { + Await.result(stopResponse, timeout); + + fail("The stop task message should have failed."); + } catch (UnsupportedOperationException e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack + * trace message fails. + */ + @Test + public void testStackTraceSampleFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + Future stackTraceResponse = taskManager.ask( + new TriggerStackTraceSample( + 0, + new ExecutionAttemptID(), + 0, + Time.milliseconds(1L), + 0), + timeout); + + try { + Await.result(stackTraceResponse, timeout); + + fail("The trigger stack trace message should have failed."); + } catch (IllegalStateException e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + + /** + * Tests that the TaskManager sends a proper exception back to the sender if the trigger stack + * trace message fails. + */ + @Test + public void testUpdateTaskInputPartitionsFailure() throws Exception { + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + try { + + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor( + new JobID(), + "test job", + new JobVertexID(), + executionAttemptId, + new SerializedValue<>(new ExecutionConfig()), + "test task", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + Tasks.BlockingNoOpInvokable.class.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + + Future submitResponse = taskManager.ask(new SubmitTask(tdd), timeout); + + Await.result(submitResponse, timeout); + + Future partitionUpdateResponse = taskManager.ask( + new TaskMessages.UpdateTaskSinglePartitionInfo( + executionAttemptId, + new IntermediateDataSetID(), + new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())), + timeout); + + try { + Await.result(partitionUpdateResponse, timeout); + + fail("The update task input partitions message should have failed."); + } catch (Exception e) { + // expected + } + } finally { + TestingUtils.stopActor(jobManager); + TestingUtils.stopActor(taskManager); + } + } + // -------------------------------------------------------------------------------------------- public static class SimpleJobManager extends FlinkUntypedActor {