flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/3] flink git commit: [FLINK-2357] [web dashboard] New dashboard backend server supports requests from old web server as well.
Date Thu, 23 Jul 2015 14:13:20 GMT
[FLINK-2357] [web dashboard] New dashboard backend server supports requests from old web server as well.

Also moves TestRunner to test scope.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c52e753a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c52e753a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c52e753a

Branch: refs/heads/master
Commit: c52e753a8d3fc15ed48df7bfa24a327a90df9a0f
Parents: 4473db6
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Jul 23 12:01:13 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jul 23 16:11:35 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  11 -
 flink-runtime-web/pom.xml                       |  30 +-
 .../flink/runtime/webmonitor/TestRunner.java    | 197 ------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  10 +-
 .../handlers/RequestJobIdsHandler.java          |   2 +-
 .../legacy/JobManagerInfoHandler.java           | 705 +++++++++++++++++++
 .../runtime/webmonitor/legacy/JsonFactory.java  | 112 +++
 .../runtime/webmonitor/runner/TestRunner.java   | 198 ++++++
 .../web-dashboard/app/scripts/index.coffee      |   6 +-
 flink-runtime-web/web-dashboard/server.js       |   2 +-
 flink-runtime-web/web-dashboard/web/js/index.js |   5 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   3 -
 .../minicluster/LocalFlinkMiniCluster.scala     |   9 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   9 +-
 14 files changed, 1058 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9690f41..c76741b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -303,11 +303,6 @@ public final class ConfigConstants {
 	 * The option that specifies whether to use the new web frontend
 	 */
 	public static final String JOB_MANAGER_NEW_WEB_FRONTEND_KEY = "jobmanager.new-web-frontend";
-
-	/**
-	 * The port for the runtime monitor web-frontend server.
-	 */
-	public static final String JOB_MANAGER_NEW_WEB_PORT_KEY = "jobmanager.new-web.port";
 	
 	/**
 	 * The config parameter defining the number of archived jobs for the jobmanager
@@ -612,12 +607,6 @@ public final class ConfigConstants {
 	 * Setting this value to {@code -1} disables the web frontend.
 	 */
 	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
-
-	/**
-	 * The config key for the port of the JobManager new web frontend.
-	 * Setting this value to {@code -1} disables the web frontend.
-	 */
-	public static final int DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT = 8082;
 	
 	/**
 	 * The default number of archived jobs for the jobmanager

http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index ffe15af..0a05111 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -46,18 +46,6 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java-examples</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
 		<!-- ===================================================
 						Dependencies for the Web Server
 			=================================================== -->
@@ -102,6 +90,24 @@ under the License.
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
+
+		<!-- ===================================================
+								Testing
+			=================================================== -->
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 		
 	</dependencies>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java
deleted file mode 100644
index eecc81a..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.java.relational.util.WebLogData;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
-
-/**
- * Simple runner that brings up a local cluster with the web server and executes two
- * jobs to expose their data in the archive
- */
-@SuppressWarnings("serial")
-public class TestRunner {
-
-	public static void main(String[] args) throws Exception {
-
-		// start the cluster with the runtime monitor
-		Configuration configuration = new Configuration();
-		configuration.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true);
-		configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
-		configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
-			"/data/repositories/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
-		
-		LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false);
-
-		final int port = cluster.getJobManagerRPCPort();
-		runWordCount(port);
-		runWebLogAnalysisExample(port);
-		runWordCount(port);
-
-		Object o = new Object();
-		synchronized (o) {
-			o.wait();
-		}
-		
-		cluster.shutdown();
-	}
-	
-	private static void runWordCount(int port) throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
-		
-		DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
-
-		DataSet<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-						// group by the tuple field "0" and sum up tuple field "1"
-						.groupBy(0)
-						.sum(1);
-
-		counts.print();
-	}
-	
-	private static void runWebLogAnalysisExample(int port) throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
-
-		// get input data
-		DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
-		DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
-		DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);
-
-		// Retain documents with keywords
-		DataSet<Tuple1<String>> filterDocs = documents
-				.filter(new FilterDocByKeyWords())
-				.project(0);
-
-		// Filter ranks by minimum rank
-		DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
-				.filter(new FilterByRank());
-
-		// Filter visits by visit date
-		DataSet<Tuple1<String>> filterVisits = visits
-				.filter(new FilterVisitsByDate())
-				.project(0);
-
-		// Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
-		DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
-				filterDocs.join(filterRanks)
-						.where(0).equalTo(1)
-						.projectSecond(0,1,2);
-
-		// Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
-		DataSet<Tuple3<Integer, String, Integer>> result =
-				joinDocsRanks.coGroup(filterVisits)
-						.where(1).equalTo(0)
-						.with(new AntiJoinVisits());
-
-		result.print();
-	}
-
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-	
-	public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> {
-
-		private static final String[] KEYWORDS = { " editors ", " oscillations " };
-
-		@Override
-		public boolean filter(Tuple2<String, String> value) throws Exception {
-			// FILTER
-			// Only collect the document if all keywords are contained
-			String docText = value.f1;
-			for (String kw : KEYWORDS) {
-				if (!docText.contains(kw)) {
-					return false;
-				}
-			}
-			return true;
-		}
-	}
-
-	public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> {
-
-		private static final int RANKFILTER = 40;
-
-		@Override
-		public boolean filter(Tuple3<Integer, String, Integer> value) throws Exception {
-			return (value.f0 > RANKFILTER);
-		}
-	}
-
-
-	public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> {
-
-		private static final int YEARFILTER = 2007;
-
-		@Override
-		public boolean filter(Tuple2<String, String> value) throws Exception {
-			// Parse date string with the format YYYY-MM-DD and extract the year
-			String dateString = value.f1;
-			int year = Integer.parseInt(dateString.substring(0,4));
-			return (year == YEARFILTER);
-		}
-	}
-	
-	
-	@FunctionAnnotation.ForwardedFieldsFirst("*")
-	public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
-
-		@Override
-		public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
-			// Check if there is a entry in the visits relation
-			if (!visits.iterator().hasNext()) {
-				for (Tuple3<Integer, String, Integer> next : ranks) {
-					// Emit all rank pairs
-					out.collect(next);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 3a8dd83..0aa6b07 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.webmonitor.handlers.RequestConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestJobIdsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestOverviewHandler;
+import org.apache.flink.runtime.webmonitor.legacy.JobManagerInfoHandler;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,8 +112,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 		}
 		
 		// port configuration
-		this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_NEW_WEB_PORT_KEY,
-												ConfigConstants.DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT);
+		this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+												ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
 		if (this.configuredPort < 0) {
 			throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
 		}
@@ -133,7 +134,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
 
 //			.GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
-			
+
+			// the handler for the legacy requests
+			.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
+					
 			// this handler serves all the static contents
 			.GET("/:*", new StaticFileServerHandler(webRootDir));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
index a09bc1a..1f28a01 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
@@ -38,7 +38,7 @@ import java.util.Map;
  * May serve the IDs of current jobs, or past jobs, depending on whether this handler is
  * given the JobManager or Archive Actor Reference.
  */
-public class RequestJobIdsHandler implements  RequestHandler, RequestHandler.JsonResponse {
+public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
 	
 	private final ActorRef target;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
new file mode 100644
index 0000000..0a1e08c
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.legacy;
+
+import akka.actor.ActorRef;
+
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.KeepAliveWrite;
+import io.netty.handler.codec.http.router.Routed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.ArchiveMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultStringsFound;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound;
+import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Tuple3;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+@ChannelHandler.Sharable
+public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoHandler.class);
+
+	private static final Charset ENCODING = Charset.forName("UTF-8");
+
+	/** Underlying JobManager */
+	private final ActorRef jobmanager;
+	private final ActorRef archive;
+	private final FiniteDuration timeout;
+
+
+	public JobManagerInfoHandler(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
+		this.jobmanager = jobmanager;
+		this.archive = archive;
+		this.timeout = timeout;
+	}
+
+	@Override
+	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+		DefaultFullHttpResponse response;
+		try {
+			String result = handleRequest(routed);
+			byte[] bytes = result.getBytes(ENCODING);
+
+			response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+		}
+		catch (Exception e) {
+			byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
+			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+					HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+		}
+
+		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+		
+		KeepAliveWrite.flush(ctx, routed.request(), response);
+	}
+	
+	
+	@SuppressWarnings("unchecked")
+	private String handleRequest(Routed routed) throws Exception {
+		if ("archive".equals(routed.queryParam("get"))) {
+			Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(),
+					new Timeout(timeout));
+
+			Object result = Await.result(response, timeout);
+
+			if(!(result instanceof ArchiveMessages.ArchivedJobs)) {
+				throw new RuntimeException("RequestArchiveJobs requires a response of type " +
+						"ArchivedJobs. Instead the response is of type " + result.getClass() +".");
+			}
+			else {
+				final List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(
+						((ArchiveMessages.ArchivedJobs) result).asJavaCollection());
+
+				return writeJsonForArchive(archivedJobs);
+			}
+		}
+		else if ("jobcounts".equals(routed.queryParam("get"))) {
+			Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(),
+					new Timeout(timeout));
+
+			Object result = Await.result(response, timeout);
+
+			if (!(result instanceof Tuple3)) {
+				throw new RuntimeException("RequestJobCounts requires a response of type " +
+						"Tuple3. Instead the response is of type " + result.getClass() +
+						".");
+			}
+			else {
+				return writeJsonForJobCounts((Tuple3<Integer, Integer, Integer>) result);
+			}
+		}
+		else if ("job".equals(routed.queryParam("get"))) {
+			String jobId = routed.queryParam("job");
+
+			Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+					new Timeout(timeout));
+
+			Object result = Await.result(response, timeout);
+
+			if (!(result instanceof JobManagerMessages.JobResponse)){
+				throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
+						"Instead the response is of type " + result.getClass());
+			}
+			else {
+				final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result;
+
+				if (jobResponse instanceof JobManagerMessages.JobFound){
+					ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)result).executionGraph();
+					return writeJsonForArchivedJob(archivedJob);
+				}
+				else {
+					throw new Exception("DoGet:job: Could not find job for job ID " + jobId);
+				}
+			}
+		}
+		else if ("groupvertex".equals(routed.queryParam("get"))) {
+			String jobId = routed.queryParam("job");
+			String groupVertexId = routed.queryParam("groupvertex");
+
+			// No group vertex specified
+			if (groupVertexId.equals("null")) {
+				throw new Exception("Found null groupVertexId");
+			}
+
+			Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+					new Timeout(timeout));
+
+			Object result = Await.result(response, timeout);
+
+			if (!(result instanceof JobManagerMessages.JobResponse)){
+				throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
+						"Instead the response is of type " + result.getClass());
+			}
+			else {
+				final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result;
+
+				if (jobResponse instanceof JobManagerMessages.JobFound) {
+					ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)jobResponse).executionGraph();
+
+					return writeJsonForArchivedJobGroupvertex(archivedJob, JobVertexID.fromHexString(groupVertexId));
+				}
+				else {
+					throw new Exception("DoGet:groupvertex: Could not find job for job ID " + jobId);
+				}
+			}
+		}
+		else if ("taskmanagers".equals(routed.queryParam("get"))) {
+			Future<Object> response = Patterns.ask(jobmanager,
+					JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+					new Timeout(timeout));
+
+			Object result = Await.result(response, timeout);
+
+			if (!(result instanceof Integer)) {
+				throw new RuntimeException("RequestNumberRegisteredTaskManager requires a " +
+						"response of type Integer. Instead the response is of type " +
+						result.getClass() + ".");
+			}
+			else {
+				final int numberOfTaskManagers = (Integer)result;
+
+				final Future<Object> responseRegisteredSlots = Patterns.ask(jobmanager,
+						JobManagerMessages.getRequestTotalNumberOfSlots(),
+						new Timeout(timeout));
+
+				final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
+						timeout);
+
+				if (!(resultRegisteredSlots instanceof Integer)) {
+					throw new RuntimeException("RequestTotalNumberOfSlots requires a response of " +
+							"type Integer. Instaed the response of type " +
+							resultRegisteredSlots.getClass() + ".");
+				}
+				else {
+					final int numberOfRegisteredSlots = (Integer) resultRegisteredSlots;
+
+					return "{\"taskmanagers\": " + numberOfTaskManagers + ", " +
+							"\"slots\": " + numberOfRegisteredSlots + "}";
+				}
+			}
+		}
+		else if ("cancel".equals(routed.queryParam("get"))) {
+			String jobId = routed.queryParam("job");
+
+			Future<Object> response = Patterns.ask(jobmanager, new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
+					new Timeout(timeout));
+
+			Await.ready(response, timeout);
+			return "{}";
+		}
+		else if ("updates".equals(routed.queryParam("get"))) {
+			String jobId = routed.queryParam("job");
+			return writeJsonUpdatesForJob(JobID.fromHexString(jobId));
+		}
+		else if ("version".equals(routed.queryParam("get"))) {
+			return writeJsonForVersion();
+		}
+		else{
+			Future<Object> response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(),
+					new Timeout(timeout));
+
+			Object result = Await.result(response, timeout);
+
+			if(!(result instanceof JobManagerMessages.RunningJobs)){
+				throw new RuntimeException("RequestRunningJobs requires a response of type " +
+						"RunningJobs. Instead the response of type " + result.getClass() + ".");
+			}
+			else {
+				final Iterable<ExecutionGraph> runningJobs =
+						((JobManagerMessages.RunningJobs) result).asJavaIterable();
+
+				return writeJsonForJobs(runningJobs);
+			}
+		}
+	}
+
+	private String writeJsonForJobs(Iterable<ExecutionGraph> graphs) {
+		StringBuilder bld = new StringBuilder();
+		bld.append("[");
+
+		Iterator<ExecutionGraph> it = graphs.iterator();
+		// Loop Jobs
+		while(it.hasNext()){
+			ExecutionGraph graph = it.next();
+
+			writeJsonForJob(bld, graph);
+
+			//Write seperator between json objects
+			if(it.hasNext()) {
+				bld.append(",");
+			}
+		}
+		bld.append("]");
+		
+		return bld.toString();
+	}
+
+	private void writeJsonForJob(StringBuilder bld, ExecutionGraph graph) {
+		//Serialize job to json
+		bld.append("{");
+		bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
+		bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
+		bld.append("\"status\": \"").append(graph.getState()).append("\",");
+		bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState())).append(",");
+
+		// Serialize ManagementGraph to json
+		bld.append("\"groupvertices\": [");
+		boolean first = true;
+
+		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+			//Write seperator between json objects
+			if (first) {
+				first = false;
+			} else {
+				bld.append(",");
+			}
+			bld.append(JsonFactory.toJson(groupVertex));
+		}
+		bld.append("]");
+		bld.append("}");
+	}
+
+	private String writeJsonForArchive(List<ExecutionGraph> graphs) {
+		StringBuilder bld = new StringBuilder();
+		bld.append("[");
+
+		// sort jobs by time
+		Collections.sort(graphs, new Comparator<ExecutionGraph>() {
+			@Override
+			public int compare(ExecutionGraph o1, ExecutionGraph o2) {
+				if (o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) {
+					return 1;
+				} else {
+					return -1;
+				}
+			}
+
+		});
+
+		// Loop Jobs
+		for (int i = 0; i < graphs.size(); i++) {
+			ExecutionGraph graph = graphs.get(i);
+
+			//Serialize job to json
+			bld.append("{");
+			bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
+			bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
+			bld.append("\"status\": \"").append(graph.getState()).append("\",");
+			bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState()));
+
+			bld.append("}");
+
+			//Write seperator between json objects
+			if(i != graphs.size() - 1) {
+				bld.append(",");
+			}
+		}
+		bld.append("]");
+		return bld.toString();
+	}
+
+	private String writeJsonForJobCounts(Tuple3<Integer, Integer, Integer> jobCounts) {
+		return "{\"finished\": " + jobCounts._1() + ",\"canceled\": " + jobCounts._2() + ",\"failed\": "
+				+ jobCounts._3() + "}";
+	}
+
+
+	private String writeJsonForArchivedJob(ExecutionGraph graph) {
+		StringBuilder bld = new StringBuilder();
+
+		bld.append("[");
+		bld.append("{");
+		bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
+		bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
+		bld.append("\"status\": \"").append(graph.getState()).append("\",");
+		bld.append("\"SCHEDULED\": ").append(graph.getStatusTimestamp(JobStatus.CREATED)).append(",");
+		bld.append("\"RUNNING\": ").append(graph.getStatusTimestamp(JobStatus.RUNNING)).append(",");
+		bld.append("\"FINISHED\": ").append(graph.getStatusTimestamp(JobStatus.FINISHED)).append(",");
+		bld.append("\"FAILED\": ").append(graph.getStatusTimestamp(JobStatus.FAILED)).append(",");
+		bld.append("\"CANCELED\": ").append(graph.getStatusTimestamp(JobStatus.CANCELED)).append(",");
+
+		if (graph.getState() == JobStatus.FAILED) {
+			bld.append("\"failednodes\": [");
+			boolean first = true;
+			for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
+				if (vertex.getExecutionState() == ExecutionState.FAILED) {
+					InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+					Throwable failureCause = vertex.getFailureCause();
+					if (location != null || failureCause != null) {
+						if (first) {
+							first = false;
+						} else {
+							bld.append(",");
+						}
+						bld.append("{");
+						bld.append("\"node\": \"").append(location == null ? "(none)" : location.getFQDNHostname()).append("\",");
+						bld.append("\"message\": \"").append(failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))).append("\"");
+						bld.append("}");
+					}
+				}
+			}
+			bld.append("],");
+		}
+
+		// Serialize ManagementGraph to json
+		bld.append("\"groupvertices\": [");
+		boolean first = true;
+		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+			//Write seperator between json objects
+			if (first) {
+				first = false;
+			} else {
+				bld.append(",");
+			}
+
+			bld.append(JsonFactory.toJson(groupVertex));
+
+		}
+		bld.append("],");
+
+		// write user config
+		ExecutionConfig ec = graph.getExecutionConfig();
+		if(ec != null) {
+			bld.append("\"executionConfig\": {");
+			bld.append("\"Execution Mode\": \"").append(ec.getExecutionMode()).append("\",");
+			bld.append("\"Number of execution retries\": \"").append(ec.getNumberOfExecutionRetries()).append("\",");
+			bld.append("\"Job parallelism\": \"").append(ec.getParallelism()).append("\",");
+			bld.append("\"Object reuse mode\": \"").append(ec.isObjectReuseEnabled()).append("\"");
+			ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
+			if(uc != null) {
+				Map<String, String> ucVals = uc.toMap();
+				if (ucVals != null) {
+					String ucString = "{";
+					int i = 0;
+					for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
+						ucString += "\"" + ucVal.getKey() + "\":\"" + ucVal.getValue() + "\"";
+						if (++i < ucVals.size()) {
+							ucString += ",\n";
+						}
+					}
+					bld.append(", \"userConfig\": ").append(ucString).append("}");
+				}
+				else {
+					LOG.debug("GlobalJobParameters.toMap() did not return anything");
+				}
+			}
+			else {
+				LOG.debug("No GlobalJobParameters were set in the execution config");
+			}
+			bld.append("},");
+		}
+		else {
+			LOG.warn("Unable to retrieve execution config from execution graph");
+		}
+
+		// write accumulators
+		final Future<Object> response = Patterns.ask(jobmanager,
+				new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout));
+
+		Object result;
+		try {
+			result = Await.result(response, timeout);
+		}
+		catch (Exception ex) {
+			throw new RuntimeException("Could not retrieve the accumulator results from the job manager.", ex);
+		}
+
+		if (result instanceof AccumulatorResultStringsFound) {
+			StringifiedAccumulatorResult[] accumulators = ((AccumulatorResultStringsFound) result).result();
+
+			bld.append("\n\"accumulators\": [");
+			int i = 0;
+			for (StringifiedAccumulatorResult accumulator : accumulators) {
+				bld.append("{ \"name\": \"").append(accumulator.getName()).append(" (").append(accumulator.getType()).append(")\",").append(" \"value\": \"").append(accumulator.getValue()).append("\"}\n");
+				if (++i < accumulators.length) {
+					bld.append(",");
+				}
+			}
+			bld.append("],\n");
+		}
+		else if (result instanceof AccumulatorResultsNotFound) {
+			bld.append("\n\"accumulators\": [],");
+		}
+		else if (result instanceof AccumulatorResultsErroneous) {
+			LOG.error("Could not obtain accumulators for job " + graph.getJobID(),
+					((AccumulatorResultsErroneous) result).cause());
+		}
+		else {
+			throw new RuntimeException("RequestAccumulatorResults requires a response of type " +
+					"AccumulatorResultStringsFound. Instead the response is of type " +
+					result.getClass() + ".");
+		}
+
+		bld.append("\"groupverticetimes\": {");
+		first = true;
+
+		for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+			if (first) {
+				first = false;
+			} else {
+				bld.append(",");
+			}
+
+			// Calculate start and end time for groupvertex
+			long started = Long.MAX_VALUE;
+			long ended = 0;
+
+			// Take earliest running state and latest endstate of groupmembers
+			for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+
+				long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
+				if (running != 0 && running < started) {
+					started = running;
+				}
+
+				long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
+				long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
+				long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
+
+				if (finished != 0 && finished > ended) {
+					ended = finished;
+				}
+
+				if (canceled != 0 && canceled > ended) {
+					ended = canceled;
+				}
+
+				if (failed != 0 && failed > ended) {
+					ended = failed;
+				}
+
+			}
+
+			bld.append("\"").append(groupVertex.getJobVertexId()).append("\": {");
+			bld.append("\"groupvertexid\": \"").append(groupVertex.getJobVertexId()).append("\",");
+			bld.append("\"groupvertexname\": \"").append(groupVertex).append("\",");
+			bld.append("\"STARTED\": ").append(started).append(",");
+			bld.append("\"ENDED\": ").append(ended);
+			bld.append("}");
+
+		}
+
+		bld.append("}");
+		bld.append("}");
+		bld.append("]");
+		
+		return bld.toString();
+	}
+
+
+	private String writeJsonUpdatesForJob(JobID jobId) {
+		final Future<Object> responseArchivedJobs = Patterns.ask(jobmanager,
+				JobManagerMessages.getRequestRunningJobs(),
+				new Timeout(timeout));
+
+		Object resultArchivedJobs;
+		try{
+			resultArchivedJobs = Await.result(responseArchivedJobs, timeout);
+		}
+		catch (Exception ex) {
+			throw new RuntimeException("Could not retrieve archived jobs from the job manager.", ex);
+		}
+
+		if(!(resultArchivedJobs instanceof JobManagerMessages.RunningJobs)){
+			throw new RuntimeException("RequestArchivedJobs requires a response of type " +
+					"RunningJobs. Instead the response is of type " +
+					resultArchivedJobs.getClass() + ".");
+		}
+		else {
+			final Iterable<ExecutionGraph> graphs = ((JobManagerMessages.RunningJobs)resultArchivedJobs).
+					asJavaIterable();
+
+			//Serialize job to json
+			final StringBuilder bld = new StringBuilder();
+			
+			bld.append("{");
+			bld.append("\"jobid\": \"").append(jobId).append("\",");
+			bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\",");
+			bld.append("\"recentjobs\": [");
+
+			boolean first = true;
+
+			for (ExecutionGraph g : graphs){
+				if (first) {
+					first = false;
+				} else {
+					bld.append(",");
+				}
+
+				bld.append("\"").append(g.getJobID()).append("\"");
+			}
+			bld.append("],");
+
+			final Future<Object> responseJob = Patterns.ask(jobmanager, new JobManagerMessages.RequestJob(jobId),
+					new Timeout(timeout));
+
+			Object resultJob;
+			try{
+				resultJob = Await.result(responseJob, timeout);
+			}
+			catch (Exception ex){
+				throw new RuntimeException("Could not retrieve the job with jobID " + jobId +
+						"from the job manager.", ex);
+			}
+
+			if (!(resultJob instanceof JobManagerMessages.JobResponse)) {
+				throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
+						"Instead the response is of type " + resultJob.getClass() + ".");
+			}
+			else {
+				final JobManagerMessages.JobResponse response = (JobManagerMessages.JobResponse) resultJob;
+
+				if (response instanceof JobManagerMessages.JobFound){
+					ExecutionGraph graph = ((JobManagerMessages.JobFound)response).executionGraph();
+
+					bld.append("\"vertexevents\": [");
+
+					first = true;
+					for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
+						if (first) {
+							first = false;
+						} else {
+							bld.append(",");
+						}
+
+						bld.append("{");
+						bld.append("\"vertexid\": \"").append(ev.getCurrentExecutionAttempt().getAttemptId()).append("\",");
+						bld.append("\"newstate\": \"").append(ev.getExecutionState()).append("\",");
+						bld.append("\"timestamp\": \"").append(ev.getStateTimestamp(ev.getExecutionState())).append("\"");
+						bld.append("}");
+					}
+
+					bld.append("],");
+
+					bld.append("\"jobevents\": [");
+
+					bld.append("{");
+					bld.append("\"newstate\": \"").append(graph.getState()).append("\",");
+					bld.append("\"timestamp\": \"").append(graph.getStatusTimestamp(graph.getState())).append("\"");
+					bld.append("}");
+
+					bld.append("]");
+
+					bld.append("}");
+				}
+				else {
+					bld.append("\"vertexevents\": [],");
+					bld.append("\"jobevents\": [");
+					bld.append("{");
+					bld.append("\"newstate\": \"").append(JobStatus.FINISHED.toString()).append("\",");
+					bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\"");
+					bld.append("}");
+					bld.append("]");
+					bld.append("}");
+				}
+			}
+			
+			return bld.toString();
+		}
+	}
+	
+	private String writeJsonForArchivedJobGroupvertex(ExecutionGraph graph, JobVertexID vertexId) {
+		ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
+		StringBuilder bld = new StringBuilder();
+		
+		bld.append("{\"groupvertex\": ").append(JsonFactory.toJson(jobVertex)).append(",");
+
+		bld.append("\"verticetimes\": {");
+		boolean first = true;
+		for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
+
+			for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+
+				Execution exec = vertex.getCurrentExecutionAttempt();
+
+				if(first) {
+					first = false;
+				} else {
+					bld.append(","); }
+
+				bld.append("\"").append(exec.getAttemptId()).append("\": {");
+				bld.append("\"vertexid\": \"").append(exec.getAttemptId()).append("\",");
+				bld.append("\"vertexname\": \"").append(vertex).append("\",");
+				bld.append("\"CREATED\": ").append(vertex.getStateTimestamp(ExecutionState.CREATED)).append(",");
+				bld.append("\"SCHEDULED\": ").append(vertex.getStateTimestamp(ExecutionState.SCHEDULED)).append(",");
+				bld.append("\"DEPLOYING\": ").append(vertex.getStateTimestamp(ExecutionState.DEPLOYING)).append(",");
+				bld.append("\"RUNNING\": ").append(vertex.getStateTimestamp(ExecutionState.RUNNING)).append(",");
+				bld.append("\"FINISHED\": ").append(vertex.getStateTimestamp(ExecutionState.FINISHED)).append(",");
+				bld.append("\"CANCELING\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELING)).append(",");
+				bld.append("\"CANCELED\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELED)).append(",");
+				bld.append("\"FAILED\": ").append(vertex.getStateTimestamp(ExecutionState.FAILED)).append("");
+				bld.append("}");
+			}
+
+		}
+		bld.append("}}");
+		return bld.toString();
+	}
+
+	
+	private String writeJsonForVersion() {
+		return "{\"version\": \"" + EnvironmentInformation.getVersion() + "\",\"revision\": \"" + 
+				EnvironmentInformation.getRevisionInformation().commitId + "\"}";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
new file mode 100644
index 0000000..fe18d3f
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.legacy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class JsonFactory {
+
+	public static String toJson(ExecutionVertex vertex) {
+		StringBuilder json = new StringBuilder("");
+		json.append("{");
+		json.append("\"vertexid\": \"").append(vertex.getCurrentExecutionAttempt().getAttemptId()).append("\",");
+		json.append("\"vertexname\": \"").append(StringUtils.escapeHtml(vertex.getSimpleName())).append("\",");
+		json.append("\"vertexstatus\": \"").append(vertex.getExecutionState()).append("\",");
+		
+		InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+		String instanceName = location == null ? "(null)" : location.getFQDNHostname();
+		
+		json.append("\"vertexinstancename\": \"").append(instanceName).append("\"");
+		json.append("}");
+		return json.toString();
+	}
+	
+	public static String toJson(ExecutionJobVertex jobVertex) {
+		StringBuilder json = new StringBuilder("");
+		
+		json.append("{");
+		json.append("\"groupvertexid\": \"").append(jobVertex.getJobVertexId()).append("\",");
+		json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\",");
+		json.append("\"numberofgroupmembers\": ").append(jobVertex.getParallelism()).append(",");
+		json.append("\"groupmembers\": [");
+		
+		// Count state status of group members
+		Map<ExecutionState, Integer> stateCounts = new HashMap<ExecutionState, Integer>();
+		
+		// initialize with 0
+		for (ExecutionState state : ExecutionState.values()) {
+			stateCounts.put(state, 0);
+		}
+		
+		ExecutionVertex[] vertices = jobVertex.getTaskVertices();
+		
+		for (int j = 0; j < vertices.length; j++) {
+			ExecutionVertex vertex = vertices[j];
+			
+			json.append(toJson(vertex));
+			
+			// print delimiter
+			if (j != vertices.length - 1) {
+				json.append(",");
+			}
+			
+			// Increment state status count
+			int count =  stateCounts.get(vertex.getExecutionState()) + 1;
+			stateCounts.put(vertex.getExecutionState(), count);
+		}
+		
+		json.append("],");
+		json.append("\"backwardEdges\": [");
+		
+		List<IntermediateResult> inputs = jobVertex.getInputs();
+		
+		for (int inputNumber = 0; inputNumber < inputs.size(); inputNumber++) {
+			ExecutionJobVertex input = inputs.get(inputNumber).getProducer();
+			
+			json.append("{");
+			json.append("\"groupvertexid\": \"").append(input.getJobVertexId()).append("\",");
+			json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\"");
+			json.append("}");
+			
+			// print delimiter
+			if(inputNumber != inputs.size() - 1) {
+				json.append(",");
+			}
+		}
+		json.append("]");
+		
+		// list number of members for each status
+		for (Map.Entry<ExecutionState, Integer> stateCount : stateCounts.entrySet()) {
+			json.append(",\"").append(stateCount.getKey()).append("\": ").append(stateCount.getValue());
+		}
+		
+		json.append("}");
+		
+		return json.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
new file mode 100644
index 0000000..9a9b6ba
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.runner;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.java.relational.util.WebLogData;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+/**
+ * Simple runner that brings up a local cluster with the web server and executes two
+ * jobs to expose their data in the archive
+ */
+@SuppressWarnings("serial")
+public class TestRunner {
+
+	public static void main(String[] args) throws Exception {
+
+		// start the cluster with the runtime monitor
+		Configuration configuration = new Configuration();
+		configuration.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true);
+		configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
+		configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
+			"/data/repositories/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
+		
+		LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false);
+
+		final int port = cluster.getJobManagerRPCPort();
+		runWordCount(port);
+		runWebLogAnalysisExample(port);
+		runWordCount(port);
+
+		// block the thread
+		Object o = new Object();
+		synchronized (o) {
+			o.wait();
+		}
+		
+		cluster.shutdown();
+	}
+	
+	private static void runWordCount(int port) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
+		
+		DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.groupBy(0)
+						.sum(1);
+
+		counts.print();
+	}
+	
+	private static void runWebLogAnalysisExample(int port) throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
+
+		// get input data
+		DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
+		DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
+		DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);
+
+		// Retain documents with keywords
+		DataSet<Tuple1<String>> filterDocs = documents
+				.filter(new FilterDocByKeyWords())
+				.project(0);
+
+		// Filter ranks by minimum rank
+		DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
+				.filter(new FilterByRank());
+
+		// Filter visits by visit date
+		DataSet<Tuple1<String>> filterVisits = visits
+				.filter(new FilterVisitsByDate())
+				.project(0);
+
+		// Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
+		DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
+				filterDocs.join(filterRanks)
+						.where(0).equalTo(1)
+						.projectSecond(0,1,2);
+
+		// Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
+		DataSet<Tuple3<Integer, String, Integer>> result =
+				joinDocsRanks.coGroup(filterVisits)
+						.where(1).equalTo(0)
+						.with(new AntiJoinVisits());
+
+		result.print();
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+	
+	public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> {
+
+		private static final String[] KEYWORDS = { " editors ", " oscillations " };
+
+		@Override
+		public boolean filter(Tuple2<String, String> value) throws Exception {
+			// FILTER
+			// Only collect the document if all keywords are contained
+			String docText = value.f1;
+			for (String kw : KEYWORDS) {
+				if (!docText.contains(kw)) {
+					return false;
+				}
+			}
+			return true;
+		}
+	}
+
+	public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> {
+
+		private static final int RANKFILTER = 40;
+
+		@Override
+		public boolean filter(Tuple3<Integer, String, Integer> value) throws Exception {
+			return (value.f0 > RANKFILTER);
+		}
+	}
+
+
+	public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> {
+
+		private static final int YEARFILTER = 2007;
+
+		@Override
+		public boolean filter(Tuple2<String, String> value) throws Exception {
+			// Parse date string with the format YYYY-MM-DD and extract the year
+			String dateString = value.f1;
+			int year = Integer.parseInt(dateString.substring(0,4));
+			return (year == YEARFILTER);
+		}
+	}
+	
+	
+	@FunctionAnnotation.ForwardedFieldsFirst("*")
+	public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
+
+		@Override
+		public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
+			// Check if there is a entry in the visits relation
+			if (!visits.iterator().hasNext()) {
+				for (Tuple3<Integer, String, Integer> next : ranks) {
+					// Emit all rank pairs
+					out.collect(next);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index b85d7e0..385b3d6 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -29,11 +29,9 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
 # --------------------------------------
 
 .constant 'flinkConfig', {
-  webServer: 'http://localhost:8080'
   jobServer: 'http://localhost:8081'
-  newServer: 'http://localhost:8082'
-#  webServer: 'http://localhost:3000/web-server'
-#  jobServer: 'http://localhost:3000/job-server'
+  newServer: 'http://localhost:8081'
+#  jobServer: 'http://localhost:3000/new-server'
 #  newServer: 'http://localhost:3000/new-server'
   refreshInterval: 10000
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/web-dashboard/server.js
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/server.js b/flink-runtime-web/web-dashboard/server.js
index 797cfa0..453e7a4 100644
--- a/flink-runtime-web/web-dashboard/server.js
+++ b/flink-runtime-web/web-dashboard/server.js
@@ -29,7 +29,7 @@ var server = new Hapi.Server();
 var remotes = [
   { port: 8080, path: 'web-server' },
   { port: 8081, path: 'job-server' },
-  { port: 8082, path: 'new-server' }
+  { port: 8081, path: 'new-server' }
 ]
 
 server.connection({ port: 3000 });


Mime
View raw message