flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/9] flink git commit: [FLINK-1501] Add metrics library for monitoring TaskManagers
Date Fri, 27 Mar 2015 10:39:18 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6b9cee33c -> 2d1f8b07c


http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
index 9956b48..f038a3b 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
@@ -16,38 +16,271 @@
  * limitations under the License.
  */
 
-$(document).ready(function() {
-	pollTaskmanagers();
-	
-});
+
+function convertHex(hex,opacity){
+    hex = hex.replace('#','');
+    r = parseInt(hex.substring(0,2), 16);
+    g = parseInt(hex.substring(2,4), 16);
+    b = parseInt(hex.substring(4,6), 16);
+
+    result = 'rgba('+r+','+g+','+b+','+opacity/100+')';
+    return result;
+}
+
+/**
+ * Copied and modified from: https://github.com/shutterstock/rickshaw/blob/master/src/js/Rickshaw.Fixtures.Number.js
+ **/
+var formatBase1024KMGTP = function(y) {
+    var abs_y = Math.abs(y);
+    if (abs_y >= 1125899906842624)  { return Math.floor(y / 1125899906842624) + "P" }
+    else if (abs_y >= 1099511627776){ return Math.floor(y / 1099511627776) + "T" }
+    else if (abs_y >= 1073741824)   { return Math.floor(y / 1073741824) + "G" }
+    else if (abs_y >= 1048576)      { return Math.floor(y / 1048576) + "M" }
+    else if (abs_y >= 1024)         { return Math.floor(y / 1024) + "K" }
+    else if (abs_y < 1 && y > 0)    { return y.toFixed(2) }
+    else if (abs_y === 0)           { return '' }
+    else                        { return y }
+};
+
+function getUnixTime() {
+	return Math.floor(new Date().getTime()/1000);
+}
+
+// this array contains the history metrics for the taskManagers.
+var taskManagerMemory = [];
+
+// array with the graphs for each taskManager.
+var taskManagerGraph = [];
+
+// array with the latest metrics for all TaskManagers (for the Full metrics view)
+var taskManagerMetrics = [];
+
+// values for the memory charting. In order!
+var memoryValues = ["memory.non-heap.used" , "memory.flink.used", "memory.heap.used" ];
+
+var metricsLimit = 3;
+
+/**
+Create rickshaw graph for the specified taskManager id (tmid).
+**/
+function createGraph(tmId, maxload, maxmem) {
+    var palette = new Rickshaw.Color.Palette({scheme: "spectrum14"} );
+    var series = [];
+    var scales = [];
+    scales.push(d3.scale.linear().domain([0, maxmem]));
+    scales.push(d3.scale.linear().domain([0, maxload]).nice());
+    for(i in memoryValues) {
+        var value = memoryValues[i];
+        taskManagerMemory[tmId][value] = [];
+        series.push({
+            color: convertHex(palette.color(), 90),
+            data: taskManagerMemory[tmId][value],
+            name: value,
+            scale: scales[0],
+            renderer: 'area',
+            stroke: 'rgba(0,0,0,0.5)'
+        });
+    }
+    taskManagerMemory[tmId]["load"] = [];
+    // add load series
+    series.push({
+        color: palette.color(),
+        scale: scales[1],
+        data: taskManagerMemory[tmId]["load"],
+        name: "OS Load",
+        renderer: 'line',
+        stroke: 'rgba(0,0,0,0.5)'
+    });
+
+    // remove message
+    $("#chart-"+tmId).html("");
+    var graph = new Rickshaw.Graph( {
+        element: document.querySelector("#chart-"+tmId),
+        width: 560,
+        height: 250,
+        series: series,
+        renderer: 'multi',
+        stroke: true,
+        min: 0,
+        max: 1
+    } );
+
+    var x_axis = new Rickshaw.Graph.Axis.Time( { graph: graph } );
+
+    var y_axis = new Rickshaw.Graph.Axis.Y.Scaled( {
+        graph: graph,
+        orientation: 'left',
+        scale: scales[0],
+        height: 250,
+        pixelsPerTick: 30,
+        tickSize: 1,
+        tickFormat: formatBase1024KMGTP,
+        element: document.getElementById("y_axis-"+tmId)
+    } );
+
+    var y_axis_load = new Rickshaw.Graph.Axis.Y.Scaled( {
+        graph: graph,
+        orientation: 'right',
+        scale: scales[1],
+        grid: false,
+        element: document.getElementById("y_axis-load-"+tmId)
+    } );
+
+    var hoverDetail = new Rickshaw.Graph.HoverDetail( {
+        graph: graph,
+        yFormatter: formatBase1024KMGTP
+    } );
+
+    var legend = new Rickshaw.Graph.Legend({
+        graph: graph,
+        element: document.querySelector("#legend-"+tmId)
+    });
+
+    var tableBox = $("#tm-row-"+tmId+"-memory");
+
+    // make graph resizable
+    var resize = function() {
+        graph.configure({
+            width: tableBox.innerWidth() - $(".y_axis").width() - 80
+        });
+        graph.render();
+    }
+    setTimeout(resize, 1000);
+    resize();
+    window.addEventListener('resize', resize);
+
+    return graph;
+}
+
+function drawOrUpdateGCStats(tmId, metrics) {
+    var gcs = [];
+    for(var key in metrics.gauges) {
+        var pat = /gc.([^.]+).(count|time)/
+        if(pat.test(key)) {
+            var matches = key.match(pat);
+            if($.inArray(matches[1], gcs) == -1) {
+                gcs.push(matches[1]);
+            }
+        }
+    }
+
+    var html =  "<table class=\"table table-bordered table-hover table-striped\">"+
+                "<tr><td>Name</td><td>Count</td><td>Time</td></tr>";
+    for(var key in gcs) {
+        var gc = gcs[key];
+        html += "<tr><td>"+gc+"</td>";
+        html += "<td>"+metrics.gauges["gc."+gc+".count"].value+"</td>";
+        html += "<td>"+metrics.gauges["gc."+gc+".time"].value+" ms</td></tr>";
+    }
+    html +="</table>";
+    $("#gcStats-"+tmId).html(html);
+}
+
+function getTooltipHTML(txt) {
+    return "<i class=\"fa fa-exclamation-circle\" data-toggle=\"tooltip\" data-placement=\"top\" title=\""+txt+"\"></i>";
+}
 
 /*
  * Initializes taskmanagers table
  */
-function loadTaskmanagers(json) {
-	$("#taskmanagerTable").empty();
-	var table = "<table class=\"table table-bordered table-hover table-striped\">";
-	table += "<tr><th>Node</th><th>Ipc Port</th><th>Data Port</th><th>Seconds since last Heartbeat</th>" +
-			"<th>Number of Slots</th><th>Available Slots</th><th>CPU Cores</th><th>Physical Memory (mb)</th><th>TaskManager Heapsize (mb)</th>" +
-            "<th>Managed Memory (mb)</th><th>Show Stacktrace</th></tr>";
+function processTMdata(json) {
+    var tableHeader = $("#taskmanagerTable-header");
+    $("#page-title").text("Task Managers ("+json.taskmanagers.length+")");
 	for (var i = 0; i < json.taskmanagers.length; i++) {
-		var tm = json.taskmanagers[i]
-		table += "<tr><td>"+tm.inetAdress+"</td><td>"+tm.ipcPort+"</td><td>"+tm.dataPort+"</td><td>"+tm.timeSinceLastHeartbeat+"</td>" +
-				"<td>"+tm.slotsNumber+"</td><td>"+tm.freeSlots+"</td><td>"+tm.cpuCores+"</td><td>"+tm.physicalMemory+"</td><td>"+tm.freeMemory+"</td>" +
-                "<td>"+tm.managedMemory+"</td><td><a href=\"javascript:showStacktraceOfTaskmanager('"+ tm.instanceID +"')\">Show</a></td></tr>";
-	}
-	table += "</table>";
-	$("#taskmanagerTable").append(table);
-}
+		var tm = json.taskmanagers[i];
+		var tmRowIdCssName = "tm-row-"+tm.instanceID;
+		if(!tm.hasOwnProperty("metrics")) {
+		    // metrics not yet received by the JobManager
+		    return;
+		}
+		var metricsJSON = tm.metrics;
+		taskManagerMetrics[tm.instanceID] = metricsJSON;
 
-function pollTaskmanagers() {
-	$.ajax({ url : "setupInfo?get=taskmanagers", type : "GET", cache: false, success : function(json) {
-		loadTaskmanagers(json);
-	}, dataType : "json",
-	});
-	setTimeout(function() {
-		pollTaskmanagers();
-	}, 10000);
+		// check if taskManager has a row
+		tmRow = $("#"+tmRowIdCssName);
+		if(tmRow.length == 0) {
+		    var tmMemoryBox = "<div class=\"chart_container\" id=\"chart_container-"+tm.instanceID+"\">"+
+                                  "<div class=\"y_axis\" id=\"y_axis-"+tm.instanceID+"\"><p class=\"axis_label\">Memory</p></div>"+
+                                  "<div class=\"chart\" id=\"chart-"+tm.instanceID+"\"><i>Waiting for first Heartbeat to arrive</i></div>"+
+                                  "<div class=\"y_axis-load\" id=\"y_axis-load-"+tm.instanceID+"\"><p class=\"axis_label\">Load</p></div>"+
+                               "<div class=\"legend\" id=\"legend-"+tm.instanceID+"\"></div>"+
+                               "</div>";
+
+            var content = "<tr id=\""+tmRowIdCssName+"\">" +
+		                "<td style=\"width:20%\">"+tm.inetAdress+" <br> IPC Port: "+tm.ipcPort+", Data Port: "+tm.dataPort+"</td>" + // first row: TaskManager
+		                "<td id=\""+tmRowIdCssName+"-memory\">"+tmMemoryBox+"</td>" + // second row: memory statistics
+		                "<td id=\""+tmRowIdCssName+"-info\"><i>Loading Information</i></td>" + // Information
+		                "</tr>";
+            var siblings = tableHeader.siblings();
+            if(siblings.length == 0) {
+                tableHeader.after(content);
+            } else {
+                var f = siblings.last();
+                f.after(content);
+            }
+		    var maxmem = metricsJSON.gauges["memory.total.max"].value;
+		    taskManagerMemory[tm.instanceID] = []; // create empty array for TM
+		    taskManagerGraph[tm.instanceID] = createGraph(tm.instanceID, tm.cpuCores*2, maxmem); // cpu cores as load approximation
+		    taskManagerGraph[tm.instanceID].render();
+        //    taskManagerGraph[tm.instanceID].resize();
+		}
+        // fill (update) row with contents
+        // memory statistics
+        var time = getUnixTime();
+        for(memValIdx in memoryValues) {
+            valueKey = memoryValues[memValIdx];
+
+            var flinkMemory = tm.managedMemory * 1024 * 1024;
+            switch(valueKey) {
+                case "memory.heap.used":
+                    var value = metricsJSON.gauges[valueKey].value - flinkMemory;
+                    break;
+                case "memory.non-heap.used":
+                    var value = metricsJSON.gauges[valueKey].value;
+                    break;
+                case "memory.flink.used":
+                    var value = flinkMemory;
+                    break;
+            }
+            taskManagerMemory[tm.instanceID][valueKey].push({x: time, y: value})
+        }
+        // load
+        taskManagerMemory[tm.instanceID]["load"].push({x:time, y:metricsJSON.gauges["load"].value });
+
+        if(metricsLimit == -1 || i < metricsLimit) {
+            taskManagerGraph[tm.instanceID].update();
+        } else {
+            $("#chart_container-"+tm.instanceID).hide();
+        }
+
+
+
+        // info box
+        tmInfoBox = $("#"+tmRowIdCssName+"-info");
+        var slotsInfo = "";
+        if(tm.slotsNumber < tm.cpuCores) {
+            slotsInfo = getTooltipHTML("The number of configured processing slots ("+tm.slotsNumber+") is lower than the "+
+                "number of CPU cores ("+tm.cpuCores+"). For good performance, the number of slots should be at least the number of cores.");
+        }
+        var memoryInfo = "";
+        if(  (tm.managedMemory/tm.physicalMemory) < 0.6 ) {
+            memoryInfo = getTooltipHTML("The amout of memory available to Flink ("+tm.managedMemory+" MB) is much lower than "+
+                "the physical memory available on the machine ("+tm.physicalMemory+" MB). For good performance, Flink should get as much memory as possible.");
+        }
+        tmInfoBox.html("Last Heartbeat: "+tm.timeSinceLastHeartbeat+" seconds ago<br>"+
+            "Processing Slots: "+tm.freeSlots+"/"+tm.slotsNumber+" "+slotsInfo+"<br>"+
+            "Flink Managed Memory: "+tm.managedMemory+" mb "+memoryInfo+"<br>"+
+            "CPU cores: "+tm.cpuCores+" <br>"+
+            "Physical Memory "+tm.physicalMemory+" mb"+
+            "<div id=\"gcStats-"+tm.instanceID+"\"></div>"+
+            "<button type=\"button\" class=\"btn btn-default\" onclick=\"javascript:showStacktraceOfTaskmanager('"+ tm.instanceID +"')\">Show Stacktrace</button> "+
+            "<button type=\"button\" class=\"btn btn-default\" onclick=\"javascript:showAllMetrics('"+ tm.instanceID +"')\">Show all metrics</button>");
+        $(function () {
+            $('[data-toggle="tooltip"]').tooltip()
+        });
+        drawOrUpdateGCStats(tm.instanceID, metricsJSON);
+
+	}
 }
 
 function showStacktraceOfTaskmanager(instanceId) {
@@ -63,7 +296,62 @@ function showStacktraceOfTaskmanager(instanceId) {
             } else if ("errorMessage" in json) {
                 html += "<pre>" + json.errorMessage + "</pre>";
             }
+            $("#taskManagerStackTrace").parent().show();
             $("#taskManagerStackTrace").html(html);
         }
     });
 }
+
+function showAllMetrics(instanceID) {
+    $("#allMetrics").parent().show();
+    $("#allMetrics").html("<h1>All metrics</h1><pre>"+JSON.stringify(taskManagerMetrics[instanceID], undefined, 2)+"</pre>");
+}
+
+
+function updateLimit(element) {
+    switch(element.id) {
+        case 'metrics-limit-3':
+            $("#metrics-limit-all,#metrics-limit-none").removeClass("active");
+            $(element).addClass("active");
+            metricsLimit = 3;
+            hideShowGraphs();
+            break;
+        case 'metrics-limit-all':
+            $("#metrics-limit-3,#metrics-limit-none").removeClass("active");
+            $(element).addClass("active");
+            metricsLimit = -1;
+            hideShowGraphs();
+            break;
+        case 'metrics-limit-none':
+            $("#metrics-limit-all,#metrics-limit-3").removeClass("active");
+            $(element).addClass("active");
+            metricsLimit = 0;
+            hideShowGraphs();
+            break;
+    }
+}
+
+function hideShowGraphs() {
+    var i = 0;
+    for(tmid in taskManagerMemory) {
+       if(metricsLimit == -1 || i++ < metricsLimit) {
+            $("#chart_container-"+tmid).show();
+       } else {
+            $("#chart_container-"+tmid).hide();
+       }
+    }
+}
+
+
+function updateTaskManagers() {
+	$.ajax({ url : "setupInfo?get=taskmanagers", type : "GET", cache: false, success : function(json) {
+		processTMdata(json);
+	}, dataType : "json"
+	});
+}
+
+
+$(document).ready(function() {
+    updateTaskManagers(); // first call
+	setInterval(updateTaskManagers, 5000); // schedule periodic calls.
+});

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html b/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html
index 3a1ad60..5558478 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/taskmanagers.html
@@ -39,9 +39,49 @@ under the License.
     <!-- Scripts from Bootstrap -->
     <script src="js/jquery-2.1.0.js"></script>
     <script src="js/bootstrap.js"></script>
-    
+
+    <!-- Scripts and CSS for the visualization -->
+    <link rel="stylesheet" href="css/rickshaw.min.css">
+    <script src="js/d3.min.js"></script>
+    <script src="js/d3.layout.min.js"></script>
+    <script src="js/rickshaw.min.js"></script>
+
+    <style>
+    .chart_container {
+        position: relative;
+        font-family: Arial, Helvetica, sans-serif;
+    }
+    .chart {
+        position: relative;
+        left: 65px;
+    }
+    .y_axis {
+        position: absolute;
+        top: 0;
+        bottom: 0;
+        width: 60px;
+    }
+    .y_axis-load {
+        position: absolute;
+        top: 0;
+        bottom: 0;
+        width: 60px;
+        right: 0px;
+    }
+    .axis_label {
+        position:absolute;
+        text-align: center;
+        width: inherit;
+    }
+
+    .legend {
+        display: inline-block;
+        vertical-align: top;
+        margin: -50px 0 0 10px;
+    }
+    </style>
+
     <!-- Scripts from Flink -->
-	<script type="text/javascript" src="js/jquery.flot.min.js"></script>
 	<script type="text/javascript" src="js/helpers.js"></script>
 	<script type="text/javascript" src="js/taskmanager.js"></script>
 	<script type="text/javascript" src="js/jcanvas.min.js"></script>
@@ -52,7 +92,7 @@ under the License.
 		$.ajax({ url : "menu?get=taskmanagers", type : "GET", cache: false, success : function(html) {
 			$("#side-menu").empty();
 			$("#side-menu").append(html);
-		}, dataType : "html",
+		}, dataType : "html"
 		});
 	});
   	</script>
@@ -101,17 +141,36 @@ under the License.
 
         <div class="row">
           <div class="col-lg-12">
-            <h1>Task Managers <small>Overview over connected Task Managers</small></h1>
+            <h1 id="page-title">Task Managers</h1>
             <ol class="breadcrumb">
               <li><a href="index.html"><i class="icon-dashboard"></i> Dashboard</a></li>
-              <li class="active"><i class="icon-file-alt"></i> Task Managers</li>
+              <li class="active"><i class="icon-file-alt"></i>Task Managers</li>
             </ol>
           </div>
+          <div class="col-lg-12" id="metricsControl" style="padding-bottom:25px;">
+              <button id="metrics-limit-3" onclick="updateLimit(this)" class="btn btn-default active">Show metrics for 3 TaskManagers</button>
+              <button id="metrics-limit-all" onclick="updateLimit(this)" class="btn btn-default">Show metrics for all TaskManagers</button>
+              <button id="metrics-limit-none" onclick="updateLimit(this)" class="btn btn-default">Disable metrics</button>
+          </div>
+
 		  <div class="col-lg-12">
 	          <div class="table-responsive" id="taskmanagerTable">
+                  <table class="table table-bordered table-hover table-striped">
+                      <tr id="taskmanagerTable-header"><th>TaskManager</th>
+                          <th>Memory Statistics</th>
+                          <th>Information</th>
+                      </tr>
+                  </table>
 	          </div>
 	      </div>
-          <div class="col-lg-12" id="taskManagerStackTrace"></div>
+          <div class="col-lg-12" style="display:none">
+              <i class="fa fa-times" style="position:absolute; right:20px; top:20px;" onclick="$(this).parent().hide();"></i>
+              <div id="allMetrics"></div>
+          </div>
+          <div class="col-lg-12" style="display:none">
+              <i class="fa fa-times" style="position:absolute; right:20px; top:20px;" onclick="$(this).parent().hide();"></i>
+              <div id="taskManagerStackTrace"></div>
+          </div>
         </div><!-- /.row -->
 
       </div><!-- /#page-wrapper -->

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2c39a0a..2112364 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -394,9 +394,9 @@ class JobManager(val configuration: Configuration,
       import scala.collection.JavaConverters._
       sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
 
-    case Heartbeat(instanceID) =>
+    case Heartbeat(instanceID, metricsReport) =>
       try {
-        instanceManager.reportHeartBeat(instanceID)
+        instanceManager.reportHeartBeat(instanceID, metricsReport)
       } catch {
         case t: Throwable => log.error(t, "Could not report heart beat from {}.", sender.path)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 4e6144a..d27885b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -115,8 +115,9 @@ object TaskManagerMessages {
    * manager which forwards it to the InstanceManager.
    *
    * @param instanceID
+   * @param metricsReport utf-8 encoded JSON report from the metricRegistry.
    */
-  case class Heartbeat(instanceID: InstanceID)
+  case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte])
 
   /**
    * Sends StackTrace Message of an instance with [[instanceID]]. This message is a response to

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index fa98e2d..0c9bf9b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.slf4j.LoggerFactory
@@ -53,7 +54,6 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
   var jobClient: Option[ActorRef] = None
 
 
-
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
     val config = getDefaultConfig
 
@@ -65,7 +65,12 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
   }
 
   override def startJobManager(system: ActorSystem): ActorRef = {
-    val (jobManager, _) = JobManager.startJobManagerActors(configuration, system)
+    val config = configuration.clone()
+    val (jobManager, archiver) = JobManager.startJobManagerActors(config, system)
+    if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) {
+      val webServer = new WebInfoServer(configuration, jobManager, archiver)
+      webServer.start()
+    }
     jobManager
   }
 
@@ -96,10 +101,10 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
     }
 
     TaskManager.startTaskManagerActor(config, system, HOSTNAME, taskManagerActorName,
-                                      singleActorSystem, localExecution, classOf[TaskManager])
+      singleActorSystem, localExecution, classOf[TaskManager])
   }
 
-  def getJobClient(): ActorRef ={
+  def getJobClient(): ActorRef = {
     jobClient match {
       case Some(jc) => jc
       case None =>
@@ -109,7 +114,7 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
         config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
 
         val jc = JobClient.createJobClientFromConfig(config, singleActorSystem,
-                                                            jobClientActorSystem)
+          jobClientActorSystem)
         jobClient = Some(jc)
         jc
     }
@@ -132,25 +137,25 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
   override def shutdown(): Unit = {
     super.shutdown()
 
-    if(!singleActorSystem) {
+    if (!singleActorSystem) {
       jobClientActorSystem.shutdown()
     }
   }
 
   override def awaitTermination(): Unit = {
-    if(!singleActorSystem) {
+    if (!singleActorSystem) {
       jobClientActorSystem.awaitTermination()
     }
     super.awaitTermination()
   }
 
   def initializeIOFormatClasses(configuration: Configuration): Unit = {
-    try{
+    try {
       val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration",
         classOf[Configuration])
       om.setAccessible(true)
       om.invoke(null, configuration)
-    }catch {
+    } catch {
       case e: Exception =>
         LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might not " +
           "follow the specified default behaviour.")
@@ -162,16 +167,16 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
     if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
 
       val bufferMem: Long =
-            config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-                           ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) *
-            config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-                           ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
+        config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+          ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) *
+          config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+            ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
 
       val numTaskManager = config.getInteger(
         ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
 
       val memoryFraction = config.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-                                           ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
 
       // full memory size
       var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
@@ -183,7 +188,7 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
       // for each TaskManager, subtract the memory needed for memory buffers
       memorySize -= bufferMem
       memorySize = (memorySize * memoryFraction).toLong
-      memorySize >>>= 20  // bytes to megabytes
+      memorySize >>>= 20 // bytes to megabytes
       config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
     }
   }
@@ -202,3 +207,14 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
     config
   }
 }
+
+object LocalFlinkMiniCluster {
+  val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
+
+  def main(args: Array[String]) {
+    var conf = new Configuration;
+    conf.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 4)
+    conf.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true)
+    var cluster = new LocalFlinkMiniCluster(conf, true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1db4e3e..e948dae 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -26,6 +26,10 @@ import management.{GarbageCollectorMXBean, ManagementFactory, MemoryMXBean}
 
 import akka.actor._
 import akka.pattern.ask
+import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
+import com.codahale.metrics.json.MetricsModule
+import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
+import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.flink.api.common.cache.DistributedCache
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.fs.Path
@@ -118,6 +122,18 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
   val hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
   val fileCache = new FileCache()
   val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, Task]()
+  val metricRegistry = new MetricRegistry
+  // register metrics
+  metricRegistry.register("gc", new GarbageCollectorMetricSet)
+  metricRegistry.register("memory", new MemoryUsageGaugeSet)
+  metricRegistry.register("load", new Gauge[Double] {
+    override def getValue: Double =
+      ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
+  })
+  // register metric serialization
+  val metricRegistryMapper: ObjectMapper =
+    new ObjectMapper().registerModule(new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS,
+      false, MetricFilter.ALL))
 
   // Actors which want to be notified once this task manager has been registered at the job manager
   val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
@@ -293,7 +309,17 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
       }
 
     case SendHeartbeat =>
-      currentJobManager foreach { _ ! Heartbeat(instanceID) }
+      var report: Array[Byte] = null
+      try {
+        report = metricRegistryMapper.writeValueAsBytes(metricRegistry)
+      } catch {
+        case all: Throwable => log.warning("Error turning the report into JSON", all)
+      }
+
+      currentJobManager foreach {
+        _ ! Heartbeat(instanceID, report)
+      }
+
 
     case LogMemoryUsage =>
       logMemoryStats()

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index b9aa674..a0d0a83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -166,12 +166,12 @@ public class InstanceManagerTest{
 			InstanceID i3 = cm.registerTaskManager(probe3.getRef(), ici3, hardwareDescription, 1);
 
 			// report some immediate heart beats
-			assertTrue(cm.reportHeartBeat(i1));
-			assertTrue(cm.reportHeartBeat(i2));
-			assertTrue(cm.reportHeartBeat(i3));
+			assertTrue(cm.reportHeartBeat(i1, new byte[] {}));
+			assertTrue(cm.reportHeartBeat(i2, new byte[] {}));
+			assertTrue(cm.reportHeartBeat(i3, new byte[] {}));
 			
 			// report heart beat for non-existing instance
-			assertFalse(cm.reportHeartBeat(new InstanceID()));
+			assertFalse(cm.reportHeartBeat(new InstanceID(), new byte[] {}));
 			
 			final long WAIT = 200;
 			CommonTestUtils.sleepUninterruptibly(WAIT);
@@ -185,7 +185,7 @@ public class InstanceManagerTest{
 			long h3 = it.next().getLastHeartBeat();
 
 			// send one heart beat again and verify that the
-			assertTrue(cm.reportHeartBeat(instance1.getId()));
+			assertTrue(cm.reportHeartBeat(instance1.getId(), new byte[] {}));
 			long newH1 = instance1.getLastHeartBeat();
 			
 			long now = System.currentTimeMillis();
@@ -223,7 +223,7 @@ public class InstanceManagerTest{
 				// expected
 			}
 			
-			assertFalse(cm.reportHeartBeat(new InstanceID()));
+			assertFalse(cm.reportHeartBeat(new InstanceID(), new byte[] {}));
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 35d78eb..c8b79ef 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -58,7 +58,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	// --------------------------------------------------------------------------------------------
 
 	public void startCluster() throws Exception{
-		this.executor = startCluster(numTaskManagers, taskManagerNumSlots);
+		this.executor = startCluster(numTaskManagers, taskManagerNumSlots, false);
 	}
 
 	public void stopCluster() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 850cb88..3e4bb33 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -59,7 +59,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 	 * Enum that defines which execution environment to run the next test on:
 	 * An embedded local flink cluster, or the collection execution backend.
 	 */
-	public static enum TestExecutionMode {
+	public enum TestExecutionMode {
 		CLUSTER,
 		COLLECTION
 	}
@@ -88,7 +88,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 
 	@BeforeClass
 	public static void setup() throws Exception{
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM);
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 64dd3f1..754314b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -24,6 +24,7 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,6 +32,8 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -42,8 +45,11 @@ import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -55,6 +61,8 @@ import java.util.concurrent.TimeUnit;
 
 public class TestBaseUtils {
 
+	private static final Logger LOG = LoggerFactory.getLogger(TestBaseUtils.class);
+
 	protected static final int MINIMUM_HEAP_SIZE_MB = 192;
 
 	protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
@@ -70,6 +78,8 @@ public class TestBaseUtils {
 	protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration
 			(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
 
+	protected static File logDir;
+
 	protected TestBaseUtils(){
 		verifyJvmOptions();
 	}
@@ -80,9 +90,11 @@ public class TestBaseUtils {
 				+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
 	}
 
-	protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots)
-			throws Exception {
-
+	protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers, int
+			taskManagerNumSlots, boolean startWebserver) throws Exception {
+		logDir = File.createTempFile("TestBaseUtils-logdir", null);
+		Assert.assertTrue("Unable to delete temp file", logDir.delete());
+		Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
 		Configuration config = new Configuration();
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
 		config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
@@ -91,13 +103,18 @@ public class TestBaseUtils {
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
+		config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, startWebserver);
+		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
+		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
 		return new ForkableFlinkMiniCluster(config);
 	}
 
 	protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout)
 			throws Exception {
-
-		if (executor != null) {
+		if(logDir != null) {
+			logDir.delete();
+		}
+		if(executor != null) {
 			int numUnreleasedBCVars = 0;
 			int numActiveConnections = 0;
 			{
@@ -385,4 +402,27 @@ public class TestBaseUtils {
 	public static String constructTestURI(Class<?> forClass, String folder) {
 		return new File(constructTestPath(forClass, folder)).toURI().toString();
 	}
+
+	//---------------------------------------------------------------------------------------------
+	// Web utils
+	//---------------------------------------------------------------------------------------------
+
+	public static String getFromHTTP(String url) throws Exception{
+		URL u = new URL(url);
+		LOG.info("Accessing URL "+url+" as URL: "+u);
+		HttpURLConnection connection = (HttpURLConnection) u.openConnection();
+		connection.setConnectTimeout(100000);
+		connection.connect();
+		InputStream is = null;
+		if(connection.getResponseCode() >= 400) {
+			// error!
+			LOG.warn("HTTP Response code when connecting to {} was {}", url, connection.getResponseCode());
+			is = connection.getErrorStream();
+		} else {
+			is = connection.getInputStream();
+		}
+
+		return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 3b7932e..6df8099 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -21,6 +21,7 @@ package org.apache.flink.test.util
 import akka.actor.{Props, ActorRef, ActorSystem}
 import akka.pattern.Patterns._
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -65,8 +66,6 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
       config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
     }
 
-    config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
-
     super.generateConfiguration(config)
   }
 
@@ -83,7 +82,14 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
       libraryCacheManager, archive, accumulatorManager, None, executionRetries,
       delayBetweenRetries, timeout) with TestingJobManager)
 
-    actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
+    val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
+
+    if (userConfiguration.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER,false)){
+      val webServer = new WebInfoServer(configuration, jobManager, archive)
+      webServer.start()
+    }
+
+    jobManager
   }
 
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
new file mode 100644
index 0000000..ab70bcc
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.web;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+public class WebFrontendITCase extends MultipleProgramsTestBase {
+
+	@BeforeClass
+	public static void setup() throws Exception{
+		cluster = TestBaseUtils.startCluster(1, 4, true);
+	}
+
+	public WebFrontendITCase(TestExecutionMode m) {
+		super(m);
+	}
+
+	@Parameterized.Parameters(name = "Execution mode = {0}")
+	public static Collection<TestExecutionMode[]> executionModes(){
+		Collection<TestExecutionMode[]> c = new ArrayList<TestExecutionMode[]>(1);
+		c.add(new TestExecutionMode[] {TestExecutionMode.CLUSTER});
+		return c;
+	}
+
+	@Test
+	public void getNumberOfTaskManagers() {
+		try {
+			Assert.assertEquals("{\"taskmanagers\": "+cluster.getTaskManagers().size()+", \"slots\": 4}", TestBaseUtils.getFromHTTP("http://localhost:8081/jobsInfo?get=taskmanagers"));
+		}catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void getTaskmanagers() {
+		try {
+			String json = getFromHTTP("http://localhost:8081/setupInfo?get=taskmanagers");
+			JSONObject parsed = new JSONObject(json);
+			Object taskManagers = parsed.get("taskmanagers");
+			Assert.assertNotNull(taskManagers);
+			Assert.assertTrue(taskManagers instanceof JSONArray);
+			JSONArray tma = (JSONArray) taskManagers;
+			Assert.assertEquals(cluster.numTaskManagers(), tma.length());
+			Object taskManager = tma.get(0);
+			Assert.assertNotNull(taskManager);
+			Assert.assertTrue(taskManager instanceof JSONObject);
+			Assert.assertEquals(4, ((JSONObject) taskManager).getInt("freeSlots"));
+		}catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void getLogfiles() {
+		try {
+			String logPath = cluster.configuration().getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
+			Assert.assertNotNull(logPath);
+			FileUtils.writeStringToFile(new File(logPath, "jobmanager-main.log"), "test content");
+
+			String logs = getFromHTTP("http://localhost:8081/logInfo");
+			Assert.assertTrue(logs.contains("test content"));
+		}catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void getConfiguration() {
+		try {
+			String config = getFromHTTP("http://localhost:8081/setupInfo?get=globalC");
+			JSONObject parsed = new JSONObject(config);
+			Assert.assertEquals(logDir.toString(), parsed.getString("jobmanager.web.logpath"));
+			Assert.assertEquals(cluster.configuration().getString("taskmanager.numberOfTaskSlots", null), parsed.getString("taskmanager.numberOfTaskSlots"));
+		}catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 85897b3..0845c81 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 8ab6359..b8ae1b2 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -56,6 +56,11 @@ under the License.
 			<artifactId>flink-yarn</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 60d414f..4e2bed7 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -18,12 +18,12 @@
 package org.apache.flink.yarn;
 
 import com.google.common.base.Joiner;
-import org.apache.commons.io.IOUtils;
 import org.apache.flink.client.FlinkYarnSessionCli;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -50,9 +50,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -174,16 +171,16 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			LOG.info("Got application URL from YARN {}", url);
 
 			// get number of TaskManagers:
-			Assert.assertEquals("{\"taskmanagers\": 1, \"slots\": 1}", getFromHTTP(url + "jobsInfo?get=taskmanagers"));
+			Assert.assertEquals("{\"taskmanagers\": 1, \"slots\": 1}", TestBaseUtils.getFromHTTP(url + "jobsInfo?get=taskmanagers"));
 
 			// get the configuration from webinterface & check if the dynamic properties from YARN show up there.
-			String config = getFromHTTP(url + "setupInfo?get=globalC");
+			String config = TestBaseUtils.getFromHTTP(url + "setupInfo?get=globalC");
 			JSONObject parsed = new JSONObject(config);
 			Assert.assertEquals("veryFancy", parsed.getString("fancy-configuration-value"));
 			Assert.assertEquals("3", parsed.getString("yarn.maximum-failed-containers"));
 
 			// test logfile access
-			String logs = getFromHTTP(url + "logInfo");
+			String logs = TestBaseUtils.getFromHTTP(url + "logInfo");
 			Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster/JobManager (Version"));
 		} catch(Throwable e) {
 			LOG.warn("Error while running test",e);
@@ -466,29 +463,4 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		return false;
 	}
 
-
-	///------------------------ Ported tool form: https://github.com/rmetzger/flink/blob/flink1501/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
-
-	public static String getFromHTTP(String url) throws Exception{
-		URL u = new URL(url);
-		LOG.info("Accessing URL "+url+" as URL: "+u);
-		HttpURLConnection connection = (HttpURLConnection) u.openConnection();
-		connection.setConnectTimeout(100000);
-		connection.connect();
-		InputStream is = null;
-		if(connection.getResponseCode() >= 400) {
-			// error!
-			LOG.warn("HTTP Response code when connecting to {} was {}", url, connection.getResponseCode());
-			is = connection.getErrorStream();
-		} else {
-			is = connection.getInputStream();
-		}
-
-		return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
-	}
-
-
-
-
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ec063cf..7e556ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -702,9 +702,14 @@ under the License.
 						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/flot/*</exclude>
 						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/jcanvas.min.js</exclude>
 						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/timeline.js</exclude>
+						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/d3.min.js</exclude>
+						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/d3.layout.min.js</exclude>
+						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/rickshaw.*</exclude>
+						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/css/rickshaw.*</exclude>
 						<!-- Test Data. -->
 						<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
 						<exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude>
+						<exclude>out/test/flink-avro/avro/user.avsc</exclude>
 						<exclude>flink-staging/flink-avro/src/test/resources/testdata.avro</exclude>
 						<exclude>flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
 						<exclude>out/test/flink-avro/avro/user.avsc</exclude>


Mime
View raw message