flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [03/12] flink git commit: [FLINK-3310] [runtime-web] Add back pressure statistics to web monitor (backend)
Date Mon, 08 Feb 2016 14:05:36 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
new file mode 100644
index 0000000..2b8f804
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
+import org.junit.Test;
+import scala.Option;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for back pressure handler responses.
+ */
+public class JobVertexBackPressureHandlerTest {
+
+	/** Tests the response when no stats are available */
+	@Test
+	public void testResponseNoStatsAvailable() throws Exception {
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
+
+		when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+				.thenReturn(Option.<OperatorBackPressureStats>empty());
+
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
+				mock(ExecutionGraphHolder.class),
+				statsTracker,
+				9999);
+
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(response);
+
+		// Single element
+		assertEquals(1, rootNode.size());
+
+		// Status
+		JsonNode status = rootNode.get("status");
+		assertNotNull(status);
+		assertEquals("deprecated", status.textValue());
+
+		verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
+	}
+
+	/** Tests the response when stats are available */
+	@Test
+	public void testResponseStatsAvailable() throws Exception {
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
+
+		OperatorBackPressureStats stats = new OperatorBackPressureStats(
+				0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
+
+		when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+				.thenReturn(Option.apply(stats));
+
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
+				mock(ExecutionGraphHolder.class),
+				statsTracker,
+				9999);
+
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(response);
+
+		// Single element
+		assertEquals(4, rootNode.size());
+
+		// Status
+		JsonNode status = rootNode.get("status");
+		assertNotNull(status);
+		assertEquals("ok", status.textValue());
+
+		// Back pressure level
+		JsonNode backPressureLevel = rootNode.get("backpressure-level");
+		assertNotNull(backPressureLevel);
+		assertEquals("high", backPressureLevel.textValue());
+
+		// End time stamp
+		JsonNode endTimeStamp = rootNode.get("end-timestamp");
+		assertNotNull(endTimeStamp);
+		assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
+
+		// Subtasks
+		JsonNode subTasks = rootNode.get("subtasks");
+		assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
+		for (int i = 0; i < subTasks.size(); i++) {
+			JsonNode subTask = subTasks.get(i);
+
+			JsonNode index = subTask.get("subtask");
+			assertEquals(i, index.intValue());
+
+			JsonNode level = subTask.get("backpressure-level");
+			assertEquals(JobVertexBackPressureHandler
+					.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
+
+			JsonNode ratio = subTask.get("ratio");
+			assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
+		}
+
+		// Verify not triggered
+		verify(statsTracker, never()).triggerStackTraceSample(any(ExecutionJobVertex.class));
+	}
+
+	/** Tests that after the refresh interval another sample is triggered. */
+	@Test
+	public void testResponsePassedRefreshInterval() throws Exception {
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
+
+		OperatorBackPressureStats stats = new OperatorBackPressureStats(
+				0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
+
+		when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
+				.thenReturn(Option.apply(stats));
+
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
+				mock(ExecutionGraphHolder.class),
+				statsTracker,
+				0); // <----- refresh interval should fire immediately
+
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+
+		ObjectMapper mapper = new ObjectMapper();
+		JsonNode rootNode = mapper.readTree(response);
+
+		// Single element
+		assertEquals(4, rootNode.size());
+
+		// Status
+		JsonNode status = rootNode.get("status");
+		assertNotNull(status);
+		// Interval passed, hence deprecated
+		assertEquals("deprecated", status.textValue());
+
+		// Back pressure level
+		JsonNode backPressureLevel = rootNode.get("backpressure-level");
+		assertNotNull(backPressureLevel);
+		assertEquals("high", backPressureLevel.textValue());
+
+		// End time stamp
+		JsonNode endTimeStamp = rootNode.get("end-timestamp");
+		assertNotNull(endTimeStamp);
+		assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
+
+		// Subtasks
+		JsonNode subTasks = rootNode.get("subtasks");
+		assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
+		for (int i = 0; i < subTasks.size(); i++) {
+			JsonNode subTask = subTasks.get(i);
+
+			JsonNode index = subTask.get("subtask");
+			assertEquals(i, index.intValue());
+
+			JsonNode level = subTask.get("backpressure-level");
+			assertEquals(JobVertexBackPressureHandler
+					.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
+
+			JsonNode ratio = subTask.get("ratio");
+			assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
+		}
+
+		// Verify triggered
+		verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a03f0bf..c3bfbb0 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1089,8 +1089,7 @@ public class ExecutionGraph implements Serializable {
 
 			// We don't clean the checkpoint stats tracker, because we want
 			// it to be available after the job has terminated.
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			LOG.error("Error while cleaning up after execution", e);
 		}
 
@@ -1100,8 +1099,7 @@ public class ExecutionGraph implements Serializable {
 			if (coord != null) {
 				coord.shutdown();
 			}
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			LOG.error("Error while cleaning up after execution", e);
 		}
 	}
@@ -1231,7 +1229,6 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 	
-	
 	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
 		if (jobStatusListenerActors.size() > 0) {
 			ExecutionGraphMessages.JobStatusChanged message =

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index c89a01e..165dce4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -471,7 +471,17 @@ public class ExecutionVertex implements Serializable {
 		this.currentExecution.fail(t);
 	}
 
-	public void sendMessageToCurrentExecution(Serializable message, ExecutionAttemptID attemptID)
{
+	public boolean sendMessageToCurrentExecution(
+			Serializable message,
+			ExecutionAttemptID attemptID) {
+
+		return sendMessageToCurrentExecution(message, attemptID, null);
+	}
+
+	public boolean sendMessageToCurrentExecution(
+			Serializable message,
+			ExecutionAttemptID attemptID,
+			ActorGateway sender) {
 		Execution exec = getCurrentExecutionAttempt();
 		
 		// check that this is for the correct execution attempt
@@ -482,16 +492,26 @@ public class ExecutionVertex implements Serializable {
 			if (slot != null) {
 				ActorGateway gateway = slot.getInstance().getActorGateway();
 				if (gateway != null) {
-					gateway.tell(message);
+					if (sender == null) {
+						gateway.tell(message);
+					} else {
+						gateway.tell(message, sender);
+					}
+
+					return true;
+				} else {
+					return false;
 				}
 			}
 			else {
 				LOG.debug("Skipping message to undeployed task execution {}/{}", getSimpleName(), attemptID);
+				return false;
 			}
 		}
 		else {
 			LOG.debug("Skipping message to {}/{} because it does not match the current execution",
 					getSimpleName(), attemptID);
+			return false;
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index df7e6c6..4410ec3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -110,7 +110,7 @@ class JobManager(
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
     protected val checkpointRecoveryFactory : CheckpointRecoveryFactory)
-  extends FlinkActor 
+  extends FlinkActor
   with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
   with LogMessages // mixin order is important, we want first logging
   with LeaderContender

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
new file mode 100644
index 0000000..9f2e6e9
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages
+
+import akka.actor.ActorRef
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+
+import scala.concurrent.duration.FiniteDuration
+
+/**
+  * A set of messages exchanged with task manager instances in order to sample
+  * the stack traces of running tasks.
+  */
+object StackTraceSampleMessages {
+
+  trait StackTraceSampleMessages
+
+  /**
+    * Triggers the sampling of a running task (sent by the job manager to the
+    * task managers).
+    *
+    * @param sampleId ID of this sample.
+    * @param executionId ID of the task to sample.
+    * @param numSamples Number of stack trace samples to collect.
+    * @param delayBetweenSamples Delay between consecutive samples.
+    * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
+    *                           no maximum and collects the complete stack
+    *                           trace.
+    */
+  case class TriggerStackTraceSample(
+      sampleId: Int,
+      executionId: ExecutionAttemptID,
+      numSamples: Int,
+      delayBetweenSamples: FiniteDuration,
+      maxStackTraceDepth: Int = 0)
+    extends StackTraceSampleMessages with java.io.Serializable
+
+  /**
+    * Response after a successful stack trace sample (sent by the task managers
+    * to the job manager).
+    *
+    * @param sampleId ID of the this sample.
+    * @param executionId ID of the sampled task.
+    * @param samples Stack trace samples (head is most recent sample).
+    */
+  case class ResponseStackTraceSampleSuccess(
+      sampleId: Int,
+      executionId: ExecutionAttemptID,
+      samples: java.util.List[Array[StackTraceElement]])
+    extends StackTraceSampleMessages
+
+  /**
+    * Response after a failed stack trace sample (sent by the task managers to
+    * the job manager).
+    *
+    * @param sampleId ID of the this sample.
+    * @param executionId ID of the sampled task.
+    * @param cause Failure cause.
+    */
+  case class ResponseStackTraceSampleFailure(
+      sampleId: Int,
+      executionId: ExecutionAttemptID,
+      cause: Exception)
+    extends StackTraceSampleMessages
+
+  /**
+    * Task manager internal sample message.
+    *
+    * @param sampleId ID of the this sample.
+    * @param executionId ID of the task to sample.
+    * @param delayBetweenSamples Delay between consecutive samples.
+    * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
+    *                           no maximum and collects the complete stack
+    *                           trace.
+    * @param numRemainingSamples Number of remaining samples before this
+    *                            sample is finished.
+    * @param currentTraces The current list of gathered stack traces.
+    * @param sender Actor triggering this sample (receiver of result).
+    */
+  case class SampleTaskStackTrace(
+      sampleId: Int,
+      executionId: ExecutionAttemptID,
+      delayBetweenSamples: FiniteDuration,
+      maxStackTraceDepth: Int,
+      numRemainingSamples: Int,
+      currentTraces: java.util.List[Array[StackTraceElement]],
+      sender: ActorRef)
+    extends StackTraceSampleMessages
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5ad9dad..3b68878 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -19,34 +19,29 @@
 package org.apache.flink.runtime.taskmanager
 
 import java.io.{File, IOException}
+import java.lang.management.{ManagementFactory, OperatingSystemMXBean}
+import java.lang.reflect.Method
 import java.net.{InetAddress, InetSocketAddress}
+import java.util
 import java.util.UUID
 import java.util.concurrent.TimeUnit
-import java.lang.reflect.Method
-import java.lang.management.{OperatingSystemMXBean, ManagementFactory}
 
 import _root_.akka.actor._
 import _root_.akka.pattern.ask
 import _root_.akka.util.Timeout
-
-import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
 import com.codahale.metrics.json.MetricsModule
-import com.codahale.metrics.jvm.{BufferPoolMetricSet, MemoryUsageGaugeSet, GarbageCollectorMetricSet}
-
+import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, MemoryUsageGaugeSet}
+import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
 import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
-
 import org.apache.flink.configuration._
-import org.apache.flink.core.memory.{HybridMemorySegment, HeapMemorySegment, MemorySegmentFactory,
MemoryType}
+import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory,
MemoryType}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
-import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
-import org.apache.flink.runtime.messages.TaskMessages._
-import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint,
AbstractCheckpointMessage}
-import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.blob.{BlobService, BlobCache}
+import org.apache.flink.runtime.blob.{BlobCache, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
+import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager,
LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
@@ -56,24 +51,28 @@ import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.memory.MemoryManager 
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
+import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.Messages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.{SampleTaskStackTrace,
ResponseStackTraceSampleFailure, ResponseStackTraceSampleSuccess, StackTraceSampleMessages,
TriggerStackTraceSample}
 import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.util.NetUtils
+import org.apache.flink.runtime.messages.TaskMessages._
+import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete,
TriggerCheckpoint}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
-import org.apache.flink.runtime.util.{SignalHandler, LeaderRetrievalUtils, MathUtils, EnvironmentInformation}
+import org.apache.flink.runtime.util.{EnvironmentInformation, LeaderRetrievalUtils, MathUtils,
SignalHandler}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
+import org.apache.flink.util.NetUtils
 
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
-import scala.util.{Failure, Success}
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-
 import scala.language.postfixOps
+import scala.util.{Failure, Success}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a Flink job. It is
@@ -273,6 +272,9 @@ class TaskManager(
     // registration messages for connecting and disconnecting from / to the JobManager
     case message: RegistrationMessage => handleRegistrationMessage(message)
 
+    // task sampling messages
+    case message: StackTraceSampleMessages => handleStackTraceSampleMessage(message)
+
     // ----- miscellaneous messages ----
 
     // periodic heart beats that transport metrics
@@ -640,6 +642,120 @@ class TaskManager(
     }
   }
 
+  private def handleStackTraceSampleMessage(message: StackTraceSampleMessages): Unit = {
+    message match {
+
+      // Triggers the sampling of a task
+      case TriggerStackTraceSample(
+        sampleId,
+        executionId,
+        numSamples,
+        delayBetweenSamples,
+        maxStackTraceDepth) =>
+
+          log.debug(s"Triggering stack trace sample $sampleId.")
+
+          val senderRef = sender()
+
+          self ! SampleTaskStackTrace(
+            sampleId,
+            executionId,
+            delayBetweenSamples,
+            maxStackTraceDepth,
+            numSamples,
+            new java.util.ArrayList(),
+            senderRef)
+
+      // Repeatedly sent to self to sample a task
+      case SampleTaskStackTrace(
+        sampleId,
+        executionId,
+        delayBetweenSamples,
+        maxStackTraceDepth,
+        remainingNumSamples,
+        currentTraces,
+        sender) =>
+
+        try {
+          if (remainingNumSamples >= 1) {
+            getStackTrace(executionId, maxStackTraceDepth) match {
+              case Some(stackTrace) =>
+
+                currentTraces.add(stackTrace)
+
+                if (remainingNumSamples > 1) {
+                  // ---- Continue ----
+                  val msg = SampleTaskStackTrace(
+                    sampleId,
+                    executionId,
+                    delayBetweenSamples,
+                    maxStackTraceDepth,
+                    remainingNumSamples - 1,
+                    currentTraces,
+                    sender)
+
+                  context.system.scheduler.scheduleOnce(
+                    delayBetweenSamples,
+                    self,
+                    msg)(context.dispatcher)
+                } else {
+                  // ---- Done ----
+                  log.debug(s"Done with stack trace sample $sampleId.")
+
+                  sender ! ResponseStackTraceSampleSuccess(
+                    sampleId,
+                    executionId,
+                    currentTraces)
+                }
+
+              case None =>
+                if (currentTraces.size() == 0) {
+                  throw new IllegalStateException(s"Cannot sample task $executionId. " +
+                    s"Either the task is not known to the task manager or it is not running.")
+                } else {
+                  throw new IllegalStateException(s"Cannot sample task $executionId. " +
+                    s"Task was removed after ${currentTraces.size()} sample(s).")
+                }
+            }
+          } else {
+            throw new IllegalStateException("Non-positive number of remaining samples")
+          }
+        } catch {
+          case e: Exception =>
+            sender ! ResponseStackTraceSampleFailure(sampleId, executionId, e)
+        }
+
+      case _ => unhandled(message)
+    }
+
+    /**
+      * Returns a stack trace of a running task.
+      *
+      * @param executionId ID of the running task.
+      * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
+      *                           no maximum and collects the complete stack
+      *                           trace.
+      * @return Stack trace of the running task.
+      */
+    def getStackTrace(
+      executionId: ExecutionAttemptID,
+      maxStackTraceDepth: Int): Option[Array[StackTraceElement]] = {
+
+      val task = runningTasks.get(executionId)
+
+      if (task != null && task.getExecutionState == ExecutionState.RUNNING) {
+        val stackTrace : Array[StackTraceElement] = task.getExecutingThread.getStackTrace
+
+        if (maxStackTraceDepth > 0) {
+          Option(util.Arrays.copyOfRange(stackTrace, 0, maxStackTraceDepth.min(stackTrace.length)))
+        } else {
+          Option(stackTrace)
+        }
+      } else {
+        Option.empty
+      }
+    }
+  }
 
   // --------------------------------------------------------------------------
   //  Task Manager / JobManager association and initialization

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 17e8e11..f5271df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -23,7 +23,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
-
 import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
@@ -52,6 +51,9 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
@@ -61,14 +63,12 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
-
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -77,6 +77,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -89,10 +90,11 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartit
 import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
-public class TaskManagerTest {
+public class TaskManagerTest extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerTest.class);
 	
@@ -118,12 +120,6 @@ public class TaskManagerTest {
 	
 	@Test
 	public void testSubmitAndExecuteTask() {
-		
-		LOG.info(	"--------------------------------------------------------------------\n" + 
-					"     Starting testSubmitAndExecuteTask() \n" + 
-					"--------------------------------------------------------------------");
-		
-		
 		new JavaTestKit(system){{
 
 			ActorGateway taskManager = null;
@@ -233,11 +229,6 @@ public class TaskManagerTest {
 	
 	@Test
 	public void testJobSubmissionAndCanceling() {
-
-		LOG.info(	"--------------------------------------------------------------------\n" +
-					"     Starting testJobSubmissionAndCanceling() \n" +
-					"--------------------------------------------------------------------");
-		
 		new JavaTestKit(system){{
 
 			ActorGateway jobManager = null;
@@ -370,11 +361,6 @@ public class TaskManagerTest {
 	
 	@Test
 	public void testGateChannelEdgeMismatch() {
-
-		LOG.info(	"--------------------------------------------------------------------\n" +
-					"     Starting testGateChannelEdgeMismatch() \n" +
-					"--------------------------------------------------------------------");
-		
 		new JavaTestKit(system){{
 
 			ActorGateway jobManager = null;
@@ -462,11 +448,6 @@ public class TaskManagerTest {
 	
 	@Test
 	public void testRunJobWithForwardChannel() {
-
-		LOG.info(	"--------------------------------------------------------------------\n" +
-					"     Starting testRunJobWithForwardChannel() \n" +
-					"--------------------------------------------------------------------");
-		
 		new JavaTestKit(system){{
 
 			ActorGateway jobManager = null;
@@ -596,11 +577,6 @@ public class TaskManagerTest {
 	
 	@Test
 	public void testCancellingDependentAndStateUpdateFails() {
-
-		LOG.info(	"--------------------------------------------------------------------\n" +
-					"     Starting testCancellingDependentAndStateUpdateFails() \n" +
-					"--------------------------------------------------------------------");
-
 		// this tests creates two tasks. the sender sends data, and fails to send the
 		// state update back to the job manager
 		// the second one blocks to be canceled
@@ -929,6 +905,283 @@ public class TaskManagerTest {
 		}};
 	}
 
+	// ------------------------------------------------------------------------
+	// Stack trace sample
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Tests sampling of task stack traces.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testTriggerStackTraceSampleMessage() throws Exception {
+		new JavaTestKit(system) {{
+			ActorGateway taskManagerActorGateway = null;
+			ActorGateway jobManagerActorGateway = TestingUtils.createForwardingJobManager(
+					system,
+					getTestActor(),
+					Option.<String>empty());
+
+			final ActorGateway testActorGateway = new AkkaActorGateway(
+					getTestActor(),
+					leaderSessionID);
+
+			try {
+				final ActorGateway jobManager = jobManagerActorGateway;
+				final ActorGateway taskManager = TestingUtils.createTaskManager(
+						system,
+						jobManager,
+						new Configuration(),
+						true,
+						false);
+
+				// Registration
+				new Within(d) {
+					@Override
+					protected void run() {
+						expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
+						assertEquals(taskManager.actor(), getLastSender());
+
+						taskManager.tell(new RegistrationMessages.AcknowledgeRegistration(
+								new InstanceID(), 12345), jobManager);
+					}
+				};
+
+				// Single blocking task
+				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+						new ApplicationID(),
+						new JobID(),
+						new JobVertexID(),
+						new ExecutionAttemptID(),
+						"Task",
+						0,
+						1,
+						0,
+						new Configuration(),
+						new Configuration(),
+						Tasks.BlockingNoOpInvokable.class.getName(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.<InputGateDeploymentDescriptor>emptyList(),
+						Collections.<BlobKey>emptyList(),
+						Collections.<URL>emptyList(),
+						0);
+
+				// Submit the task
+				new Within(d) {
+					@Override
+					protected void run() {
+						try {
+							Future<Object> taskRunningFuture = taskManager.ask(
+									new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(
+											tdd.getExecutionId()), timeout);
+
+							taskManager.tell(new SubmitTask(tdd));
+
+							Await.ready(taskRunningFuture, d);
+						} catch (Exception e) {
+							e.printStackTrace();
+							fail(e.getMessage());
+						}
+					}
+				};
+
+				//
+				// 1) Trigger sample for non-existing task
+				//
+				new Within(d) {
+					@Override
+					protected void run() {
+						try {
+							ExecutionAttemptID taskId = new ExecutionAttemptID();
+
+							taskManager.tell(new TriggerStackTraceSample(
+											112223,
+											taskId,
+											100,
+											d,
+											0),
+									testActorGateway);
+
+							// Receive the expected message (heartbeat races possible)
+							Object[] msg = receiveN(1);
+							while (!(msg[0] instanceof ResponseStackTraceSampleFailure)) {
+								msg = receiveN(1);
+							}
+
+							ResponseStackTraceSampleFailure response = (ResponseStackTraceSampleFailure) msg[0];
+
+							assertEquals(112223, response.sampleId());
+							assertEquals(taskId, response.executionId());
+							assertEquals(IllegalStateException.class, response.cause().getClass());
+						} catch (Exception e) {
+							e.printStackTrace();
+							fail(e.getMessage());
+						}
+					}
+				};
+
+				//
+				// 2) Trigger sample for the blocking task
+				//
+				new Within(d) {
+					@Override
+					protected void run() {
+						boolean success = false;
+						Throwable lastError = null;
+
+						for (int i = 0; i < 100 && !success; i++) {
+							try {
+								int numSamples = 5;
+
+								taskManager.tell(new TriggerStackTraceSample(
+												19230,
+												tdd.getExecutionId(),
+												numSamples,
+												new FiniteDuration(100, TimeUnit.MILLISECONDS),
+												0),
+										testActorGateway);
+
+								// Receive the expected message (heartbeat races possible)
+								Object[] msg = receiveN(1);
+								while (!(msg[0] instanceof ResponseStackTraceSampleSuccess)) {
+									msg = receiveN(1);
+								}
+
+								ResponseStackTraceSampleSuccess response = (ResponseStackTraceSampleSuccess) msg[0];
+
+								// ---- Verify response ----
+								assertEquals(19230, response.sampleId());
+								assertEquals(tdd.getExecutionId(), response.executionId());
+
+								List<StackTraceElement[]> traces = response.samples();
+
+								assertEquals("Number of samples", numSamples, traces.size());
+
+								for (StackTraceElement[] trace : traces) {
+									// Look for BlockingNoOpInvokable#invoke
+									for (StackTraceElement elem : trace) {
+										if (elem.getClassName().equals(
+												Tasks.BlockingNoOpInvokable.class.getName())) {
+
+											assertEquals("invoke", elem.getMethodName());
+											success = true;
+											break;
+										}
+									}
+
+									assertTrue("Unexpected stack trace: " +
+											Arrays.toString(trace), success);
+								}
+							} catch (Throwable t) {
+								lastError = t;
+								LOG.warn("Failed to find invokable.", t);
+							}
+
+							try {
+								Thread.sleep(100);
+							} catch (InterruptedException e) {
+								LOG.error("Interrupted while sleeping before retry.", e);
+								break;
+							}
+						}
+
+						if (!success) {
+							if (lastError == null) {
+								fail("Failed to find invokable");
+							} else {
+								fail(lastError.getMessage());
+							}
+						}
+					}
+				};
+
+				//
+				// 4) Trigger sample for the blocking task with max depth
+				//
+				new Within(d) {
+					@Override
+					protected void run() {
+						try {
+							int numSamples = 5;
+							int maxDepth = 2;
+
+							taskManager.tell(new TriggerStackTraceSample(
+											1337,
+											tdd.getExecutionId(),
+											numSamples,
+											new FiniteDuration(100, TimeUnit.MILLISECONDS),
+											maxDepth),
+									testActorGateway);
+
+							// Receive the expected message (heartbeat races possible)
+							Object[] msg = receiveN(1);
+							while (!(msg[0] instanceof ResponseStackTraceSampleSuccess)) {
+								msg = receiveN(1);
+							}
+
+							ResponseStackTraceSampleSuccess response = (ResponseStackTraceSampleSuccess) msg[0];
+
+							// ---- Verify response ----
+							assertEquals(1337, response.sampleId());
+							assertEquals(tdd.getExecutionId(), response.executionId());
+
+							List<StackTraceElement[]> traces = response.samples();
+
+							assertEquals("Number of samples", numSamples, traces.size());
+
+							for (StackTraceElement[] trace : traces) {
+								assertEquals("Max depth", maxDepth, trace.length);
+							}
+						} catch (Exception e) {
+							e.printStackTrace();
+							fail(e.getMessage());
+						}
+					}
+				};
+
+				//
+				// 5) Trigger sample for the blocking task, but cancel it during sampling
+				//
+				new Within(d) {
+					@Override
+					protected void run() {
+						try {
+							// Trigger many samples in order to cancel the task
+							// during a sample
+							taskManager.tell(new TriggerStackTraceSample(
+											0,
+											tdd.getExecutionId(),
+											10000,
+											new FiniteDuration(100, TimeUnit.MILLISECONDS),
+											0),
+									testActorGateway);
+
+							// Cancel the task
+							taskManager.tell(new CancelTask(tdd.getExecutionId()));
+
+							// Receive the expected message (heartbeat races possible)
+							Object[] msg = receiveN(1);
+							while (!(msg[0] instanceof ResponseStackTraceSampleFailure)) {
+								msg = receiveN(1);
+							}
+
+							ResponseStackTraceSampleFailure response = (ResponseStackTraceSampleFailure) msg[0];
+
+							assertEquals(tdd.getExecutionId(), response.executionId());
+							assertEquals(IllegalStateException.class, response.cause().getClass());
+						} catch (Exception e) {
+							e.printStackTrace();
+							fail(e.getMessage());
+						}
+					}
+				};
+			} finally {
+				TestingUtils.stopActor(taskManagerActorGateway);
+				TestingUtils.stopActor(jobManagerActorGateway);
+			}
+		}};
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public static class SimpleJobManager extends FlinkUntypedActor {


Mime
View raw message