flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/3] flink git commit: [FLINK-2732] Display TM logs in Dashboard
Date Mon, 18 Apr 2016 09:28:30 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5f993c65e -> 6d53bbc4b


http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.html
index c89970e..a63e747 100644
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.html
+++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.html
@@ -17,15 +17,17 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 -->
-<nav ng-if="metrics.id" class="navbar navbar-default navbar-fixed-top navbar-main">
+<nav class="navbar navbar-default navbar-fixed-top navbar-main">
   <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i
class="fa fa-navicon"></i></div>
   <div class="navbar-title">Task Manager</div>
   <div class="navbar-info first last">Last Heartbeat: {{ metrics.timeSinceLastHeartbeat
| amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</div>
   <div class="navbar-info last first hidden-xs hidden-sm">{{metrics.path}}</div>
 </nav>
-<nav ng-if="metrics.id" class="navbar navbar-default navbar-fixed-top navbar-main-additional">
+<nav class="navbar navbar-default navbar-fixed-top navbar-main-additional">
   <ul class="nav nav-tabs">
     <li ui-sref-active="active"><a ui-sref=".metrics">Metrics</a></li>
+    <li ui-sref-active="active"><a ui-sref=".log">Logs</a></li>
+    <li ui-sref-active="active"><a ui-sref=".stdout">Stdout</a></li>
   </ul>
 </nav>
 <div id="content-inner" class="has-navbar-main-additional">

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.log.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.log.html
b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.log.html
new file mode 100644
index 0000000..84fb36e
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.log.html
@@ -0,0 +1,39 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<table class="table table-properties">
+  <thead>
+    <tr>
+      <th colspan="2">
+        <div class="row">
+          <div class="col-xs-10">Task Manager Logs</div>
+          <div class="col-xs-1 text-right"><a ng-click="reloadData()" class="show-pointer"><i
class="fa fa-refresh"></i></a></div>
+          <div class="col-xs-1 text-left"><a ng-click="downloadData()" class="show-pointer"><i
class="fa fa-download"></i></a></div>
+        </div>
+      </th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td colspan="2">
+        <pre>{{log}}</pre>
+      </td>
+    </tr>
+  </tbody>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
new file mode 100644
index 0000000..31d79af
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.stdout.html
@@ -0,0 +1,39 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<table class="table table-properties">
+  <thead>
+    <tr>
+      <th colspan="2">
+        <div class="row">
+          <div class="col-xs-10">Task Manager Output</div>
+          <div class="col-xs-1 text-right"><a ng-click="reloadData()" class="show-pointer"><i
class="fa fa-refresh"></i></a></div>
+          <div class="col-xs-1 text-left"><a ng-click="downloadData()" class="show-pointer"><i
class="fa fa-download"></i></a></div>
+        </div>
+      </th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td colspan="2">
+        <pre>{{stdout}}</pre>
+      </td>
+    </tr>
+  </tbody>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index ede4d9b..4d33364 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -195,6 +195,17 @@ public final class BlobCache implements BlobService {
 		}
 	}
 
+	/**
+	 * Deletes the file associated with the given key from the BLOB cache and BLOB server.
+	 * @param key referring to the file to be deleted
+	 */
+	public void deleteGlobal(BlobKey key) throws IOException {
+		delete(key);
+		BlobClient bc = createClient();
+		bc.delete(key);
+		bc.close();
+	}
+
 	@Override
 	public int getPort() {
 		return serverAddress.getPort();
@@ -228,6 +239,11 @@ public final class BlobCache implements BlobService {
 		}
 	}
 
+	@Override
+	public BlobClient createClient() throws IOException {
+		return new BlobClient(serverAddress);
+	}
+
 	public File getStorageDir() {
 		return this.storageDir;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 20bead0..8fcc024 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.URL;
 import java.util.ArrayList;
@@ -323,6 +324,11 @@ public class BlobServer extends Thread implements BlobService {
 		}
 	}
 
+	@Override
+	public BlobClient createClient() throws IOException {
+		return new BlobClient(new InetSocketAddress(serverSocket.getInetAddress(), getPort()));
+	}
+
 	/**
 	 * Method which retrieves the URL of a file associated with a blob key. The blob server
looks
 	 * the blob key up in its local storage. If the file exists, then the URL is returned. If
the

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index e6ea8d3..419ee8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -54,4 +54,6 @@ public interface BlobService {
 	 * Shutdown method which is called to terminate the blob service.
 	 */
 	void shutdown();
+	
+	BlobClient createClient() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 9ffaca0..2d99245 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -116,6 +116,20 @@ object TaskManagerMessages {
     */
   case class JobManagerLeaderAddress(jobManagerAddress: String, leaderSessionID: UUID)
 
+  /** Trait do differentiate which log file is requested */
+  sealed trait LogTypeRequest
+
+  /** Indicates a request for the .log file */
+  case object LogFileRequest extends LogTypeRequest
+
+  /** Indicates a request for the .out file */
+  case object StdOutFileRequest extends LogTypeRequest
+
+  /** Requests the TaskManager to upload either his log/stdout file to the Blob store 
+    * param requestType LogTypeRequest indicating which file is requested
+    */
+  case class RequestTaskManagerLog(requestType : LogTypeRequest)
+
 
   // --------------------------------------------------------------------------
   //  Utility getters for case objects to simplify access from Java
@@ -137,4 +151,19 @@ object TaskManagerMessages {
   def getRegisteredAtJobManagerMessage:
             RegisteredAtJobManager.type = RegisteredAtJobManager
 
+  /**
+    * Accessor for the case object instance, to simplify Java interoperability.
+    * @return The RequestTaskManagerLog case object instance.
+    */
+  def getRequestTaskManagerLog(): AnyRef = {
+    RequestTaskManagerLog(LogFileRequest)
+  }
+
+  /**
+    * Accessor for the case object instance, to simplify Java interoperability.
+    * @return The RequestTaskManagerStdout case object instance.
+    */
+  def getRequestTaskManagerStdout(): AnyRef = {
+    RequestTaskManagerLog(StdOutFileRequest)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 49953a6..8e1b751 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager
 
-import java.io.{File, IOException}
+import java.io.{FileInputStream, File, IOException}
 import java.lang.management.{ManagementFactory, OperatingSystemMXBean}
 import java.lang.reflect.Method
 import java.net.{InetAddress, InetSocketAddress}
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.blob.{BlobCache, BlobService}
+import org.apache.flink.runtime.blob.{BlobKey, BlobClient, BlobCache, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
 import org.apache.flink.runtime.execution.ExecutionState
@@ -319,6 +319,14 @@ class TaskManager(
 
     case FatalError(message, cause) =>
       killTaskManagerFatal(message, cause)
+
+    case RequestTaskManagerLog(requestType : LogTypeRequest) =>
+      blobService match {
+        case Some(_) =>
+          handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
+        case None =>
+          sender() ! new IOException("BlobService not available. Cannot upload TaskManager
logs.")
+      }
   }
 
   /**
@@ -784,6 +792,39 @@ class TaskManager(
     }
   }
 
+  private def handleRequestTaskManagerLog(
+      sender: ActorRef,
+      requestType: LogTypeRequest,
+      jobManager: ActorRef)
+    : Unit = {
+    val logFilePathOption = Option(config.configuration.getString(
+      ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")));
+    logFilePathOption match {
+      case None => throw new IOException("TaskManager log files are unavailable. " +
+        "Log file location not found in environment variable log.file or configuration key
"
+        + ConfigConstants.TASK_MANAGER_LOG_PATH_KEY + ".");
+      case Some(logFilePath) =>
+        val file: File = requestType match {
+          case LogFileRequest => new File(logFilePath);
+          case StdOutFileRequest =>
+            new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
+        }
+        val fis = new FileInputStream(file);
+        Future {
+          val client: BlobClient = blobService.get.createClient()
+          client.put(fis);
+        }(context.dispatcher)
+          .onComplete {
+            case Success(value) => 
+              sender ! value
+              fis.close()
+            case Failure(e) =>
+              sender ! e
+              fis.close()
+          }(context.dispatcher)
+    }
+  }
+
   // --------------------------------------------------------------------------
   //  Task Manager / ResourceManager / JobManager association and initialization
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 2963418..a5112ec 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -140,6 +140,8 @@ public class TestBaseUtils extends TestLogger {
 
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+		
+		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
 
 		ForkableFlinkMiniCluster cluster =  new ForkableFlinkMiniCluster(config, singleActorSystem);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 31084a0..e7b37d7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -84,6 +84,7 @@ public class WebFrontendITCase {
 		Files.createFile(outFile.toPath());
 		
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath());
+		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
 
 		cluster = new ForkableFlinkMiniCluster(config, false);
 		cluster.start();
@@ -159,6 +160,34 @@ public class WebFrontendITCase {
 	}
 
 	@Test
+	public void getTaskManagerLogAndStdoutFiles() {
+		try {
+			String json = getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+
+			ObjectMapper mapper = new ObjectMapper();
+			JsonNode parsed = mapper.readTree(json);
+			ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
+			JsonNode taskManager = taskManagers.get(0);
+			String id = taskManager.get("id").asText();
+			
+			WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+			
+			//we check for job manager log files, since no separate taskmanager logs exist
+			FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
+			String logs = getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
+			assertTrue(logs.contains("job manager log"));
+
+			FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
+			logs = getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
+			assertTrue(logs.contains("job manager out"));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void getConfiguration() {
 		try {
 			String config = getFromHTTP("http://localhost:" + port + "/jobmanager/config");


Mime
View raw message