flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport
Date Thu, 16 Feb 2017 16:11:57 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 d3f2fe262 -> a2853ec15


[FLINK-5773] Use akka.actor.Status.Failure class to send failures via AskSupport

Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure
to be recognized. Internally the trait will unwrap the failure and wrap it in a
scala.util.Failure instance. However, it does not recognize the scala Failure when given
to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a
scala.util.Success instance.

This closes #3324.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2853ec1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2853ec1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2853ec1

Branch: refs/heads/release-1.2
Commit: a2853ec1527dd848d635d9cb7720b2bd21c7e3aa
Parents: d3f2fe2
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Feb 15 14:16:26 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 16 17:11:32 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  21 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 237 ++++++++++++++++++-
 2 files changed, 241 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2853ec1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f1e74cd..37b5e04 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -70,13 +70,12 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{MathUtils, NetUtils}
+import org.apache.flink.util.MathUtils
 
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a Flink job. It is
@@ -429,7 +428,7 @@ class TaskManager(
               futureResponse.mapTo[Boolean].onComplete {
                 // IMPORTANT: In the future callback, we cannot directly modify state
                 //            but only send messages to the TaskManager to do those changes
-                case Success(result) =>
+                case scala.util.Success(result) =>
                   if (!result) {
                   self ! decorateMessage(
                     FailTask(
@@ -438,7 +437,7 @@ class TaskManager(
                     )
                   }
 
-                case Failure(t) =>
+                case scala.util.Failure(t) =>
                 self ! decorateMessage(
                   FailTask(
                     executionID,
@@ -476,7 +475,7 @@ class TaskManager(
               sender ! decorateMessage(Acknowledge.get())
             } catch {
               case t: Throwable =>
-                sender ! decorateMessage(Failure(t))
+                sender ! decorateMessage(Status.Failure(t))
             }
           } else {
             log.debug(s"Cannot find task to stop for execution ${executionID})")
@@ -760,8 +759,6 @@ class TaskManager(
                   // ---- Done ----
                   log.debug(s"Done with stack trace sample $sampleId.")
 
-
-
                   sender ! new StackTraceSampleResponse(sampleId, executionId, currentTraces)
                 }
 
@@ -779,7 +776,7 @@ class TaskManager(
           }
         } catch {
           case e: Exception =>
-            sender ! Failure(e)
+            sender ! decorateMessage(Status.Failure(e))
         }
 
       case _ => unhandled(message)
@@ -839,10 +836,10 @@ class TaskManager(
             client.put(fis);
           }(context.dispatcher)
             .onComplete {
-              case Success(value) =>
+              case scala.util.Success(value) =>
                 sender ! value
                 fis.close()
-              case Failure(e) =>
+              case scala.util.Failure(e) =>
                 sender ! akka.actor.Status.Failure(e)
                 fis.close()
             }(context.dispatcher)
@@ -1203,7 +1200,7 @@ class TaskManager(
     catch {
       case t: Throwable =>
         log.error("SubmitTask failed", t)
-        sender ! decorateMessage(Failure(t))
+        sender ! decorateMessage(Status.Failure(t))
     }
   }
 
@@ -1257,7 +1254,7 @@ class TaskManager(
         if (errors.isEmpty) {
           sender ! decorateMessage(Acknowledge.get())
         } else {
-          sender ! decorateMessage(Failure(new Exception(errors.mkString("\n"))))
+          sender ! decorateMessage(Status.Failure(new Exception(errors.mkString("\n"))))
         }
 
       case None =>

http://git-wip-us.apache.org/repos/asf/flink/blob/a2853ec1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 2fb5fa8..ef9e4bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -481,7 +481,7 @@ public class TaskManagerTest extends TestLogger {
 							expectMsgEquals(Acknowledge.get());
 
 							tm.tell(new StopTask(eid2), testActorGateway);
-							expectMsgClass(Failure.class);
+							expectMsgClass(Status.Failure.class);
 
 							assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
 
@@ -1225,13 +1225,13 @@ public class TaskManagerTest extends TestLogger {
 
 							// Receive the expected message (heartbeat races possible)
 							Object[] msg = receiveN(1);
-							while (!(msg[0] instanceof Failure)) {
+							while (!(msg[0] instanceof Status.Failure)) {
 								msg = receiveN(1);
 							}
 
-							Failure response = (Failure) msg[0];
+							Status.Failure response = (Status.Failure) msg[0];
 
-							assertEquals(IllegalStateException.class, response.exception().getClass());
+							assertEquals(IllegalStateException.class, response.cause().getClass());
 						} catch (Exception e) {
 							e.printStackTrace();
 							fail(e.getMessage());
@@ -1523,7 +1523,234 @@ public class TaskManagerTest extends TestLogger {
 			}
 		}};
 	}
-	
+
+	/**
+	 * Tests that the TaskManager sends a proper exception back to the sender if the submit
task
+	 * message fails.
+	 */
+	@Test
+	public void testSubmitTaskFailure() throws Exception {
+		ActorGateway jobManager = null;
+		ActorGateway taskManager = null;
+
+		try {
+
+			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+			jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+			taskManager = TestingUtils.createTaskManager(
+				system,
+				jobManager,
+				new Configuration(),
+				true,
+				true);
+
+			TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+				new JobID(),
+				"test job",
+				new JobVertexID(),
+				new ExecutionAttemptID(),
+				new SerializedValue<>(new ExecutionConfig()),
+				"test task",
+				0, // this will make the submission fail because the number of key groups must be >=
1
+				0,
+				1,
+				0,
+				new Configuration(),
+				new Configuration(),
+				"Foobar",
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				0);
+
+			Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
+
+			try {
+				Await.result(submitResponse, timeout);
+
+				fail("The submit task message should have failed.");
+			} catch (IllegalArgumentException e) {
+				// expected
+			}
+		} finally {
+			TestingUtils.stopActor(jobManager);
+			TestingUtils.stopActor(taskManager);
+		}
+	}
+
+	/**
+	 * Tests that the TaskManager sends a proper exception back to the sender if the stop task
+	 * message fails.
+	 */
+	@Test
+	public void testStopTaskFailure() throws Exception {
+		ActorGateway jobManager = null;
+		ActorGateway taskManager = null;
+
+		try {
+			final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+			jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+			taskManager = TestingUtils.createTaskManager(
+				system,
+				jobManager,
+				new Configuration(),
+				true,
+				true);
+
+			TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+				new JobID(),
+				"test job",
+				new JobVertexID(),
+				executionAttemptId,
+				new SerializedValue<>(new ExecutionConfig()),
+				"test task",
+				1,
+				0,
+				1,
+				0,
+				new Configuration(),
+				new Configuration(),
+				Tasks.BlockingNoOpInvokable.class.getName(),
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				0);
+
+			Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
+
+			Await.result(submitResponse, timeout);
+
+			Future<Object> stopResponse = taskManager.ask(new StopTask(executionAttemptId),
timeout);
+
+			try {
+				Await.result(stopResponse, timeout);
+
+				fail("The stop task message should have failed.");
+			} catch (UnsupportedOperationException e) {
+				// expected
+			}
+		} finally {
+			TestingUtils.stopActor(jobManager);
+			TestingUtils.stopActor(taskManager);
+		}
+	}
+
+	/**
+	 * Tests that the TaskManager sends a proper exception back to the sender if the trigger
stack
+	 * trace message fails.
+	 */
+	@Test
+	public void testStackTraceSampleFailure() throws Exception {
+		ActorGateway jobManager = null;
+		ActorGateway taskManager = null;
+
+		try {
+
+			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+			jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+			taskManager = TestingUtils.createTaskManager(
+				system,
+				jobManager,
+				new Configuration(),
+				true,
+				true);
+
+			Future<Object> stackTraceResponse = taskManager.ask(
+				new TriggerStackTraceSample(
+					0,
+					new ExecutionAttemptID(),
+					0,
+					Time.milliseconds(1L),
+					0),
+				timeout);
+
+			try {
+				Await.result(stackTraceResponse, timeout);
+
+				fail("The trigger stack trace message should have failed.");
+			} catch (IllegalStateException e) {
+				// expected
+			}
+		} finally {
+			TestingUtils.stopActor(jobManager);
+			TestingUtils.stopActor(taskManager);
+		}
+	}
+
+	/**
+	 * Tests that the TaskManager sends a proper exception back to the sender if the trigger
stack
+	 * trace message fails.
+	 */
+	@Test
+	public void testUpdateTaskInputPartitionsFailure() throws Exception {
+		ActorGateway jobManager = null;
+		ActorGateway taskManager = null;
+
+		try {
+
+			final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+			ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+			jobManager = new AkkaActorGateway(jm, leaderSessionID);
+
+			taskManager = TestingUtils.createTaskManager(
+				system,
+				jobManager,
+				new Configuration(),
+				true,
+				true);
+
+			TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+				new JobID(),
+				"test job",
+				new JobVertexID(),
+				executionAttemptId,
+				new SerializedValue<>(new ExecutionConfig()),
+				"test task",
+				1,
+				0,
+				1,
+				0,
+				new Configuration(),
+				new Configuration(),
+				Tasks.BlockingNoOpInvokable.class.getName(),
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				0);
+
+			Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
+
+			Await.result(submitResponse, timeout);
+
+			Future<Object> partitionUpdateResponse = taskManager.ask(
+				new TaskMessages.UpdateTaskSinglePartitionInfo(
+					executionAttemptId,
+					new IntermediateDataSetID(),
+					new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())),
+				timeout);
+
+			try {
+				Await.result(partitionUpdateResponse, timeout);
+
+				fail("The update task input partitions message should have failed.");
+			} catch (Exception e) {
+				// expected
+			}
+		} finally {
+			TestingUtils.stopActor(jobManager);
+			TestingUtils.stopActor(taskManager);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public static class SimpleJobManager extends FlinkUntypedActor {


Mime
View raw message