flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [11/14] flink git commit: [FLINK-2978] [web-dashboard] Integrate web client into Web Frontend
Date Thu, 31 Dec 2015 14:33:34 GMT
[FLINK-2978] [web-dashboard] Integrate web client into Web Frontend

This closes #1338


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72f9a6c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72f9a6c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72f9a6c6

Branch: refs/heads/master
Commit: 72f9a6c6a753123948153ee9f2e91ece70c2f71c
Parents: 4f8c5e8
Author: Sachin Goel <sachingoel0101@gmail.com>
Authored: Thu Nov 5 10:40:44 2015 +0530
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Dec 31 00:45:27 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  16 +-
 flink-dist/src/main/resources/flink-conf.yaml   |   4 +
 flink-runtime-web/pom.xml                       |   5 +
 .../runtime/webmonitor/HttpRequestHandler.java  | 131 +++++++++
 .../webmonitor/PipelineErrorHandler.java        |  83 ++++++
 .../webmonitor/RuntimeMonitorHandler.java       |  16 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  69 ++++-
 .../AbstractExecutionGraphRequestHandler.java   |   6 +-
 .../handlers/ClusterOverviewHandler.java        |   2 +-
 .../handlers/CurrentJobIdsHandler.java          |   2 +-
 .../handlers/CurrentJobsOverviewHandler.java    |   2 +-
 .../handlers/DashboardConfigHandler.java        |   2 +-
 .../handlers/JarAccessDeniedHandler.java        |  34 +++
 .../webmonitor/handlers/JarActionHandler.java   | 201 +++++++++++++
 .../webmonitor/handlers/JarDeleteHandler.java   |  70 +++++
 .../webmonitor/handlers/JarListHandler.java     | 122 ++++++++
 .../webmonitor/handlers/JarPlanHandler.java     |  55 ++++
 .../webmonitor/handlers/JarRunHandler.java      |  74 +++++
 .../webmonitor/handlers/JarUploadHandler.java   |  53 ++++
 .../handlers/JobCancellationHandler.java        |   6 +-
 .../handlers/JobManagerConfigHandler.java       |   2 +-
 .../webmonitor/handlers/RequestHandler.java     |   2 +-
 .../handlers/TaskManagersHandler.java           |   8 +-
 flink-runtime-web/web-dashboard/app/index.jade  |   5 +
 .../web-dashboard/app/partials/submit.jade      | 127 ++++++++
 .../web-dashboard/app/scripts/index.coffee      |   7 +
 .../scripts/modules/submit/submit.ctrl.coffee   | 179 ++++++++++++
 .../scripts/modules/submit/submit.svc.coffee    |  59 ++++
 .../web-dashboard/app/styles/index.styl         |  93 +++++-
 .../web-dashboard/web/css/index.css             |  98 +++++++
 flink-runtime-web/web-dashboard/web/index.html  |   2 +
 flink-runtime-web/web-dashboard/web/js/index.js | 288 +++++++++++++++++++
 .../web-dashboard/web/partials/submit.html      | 109 +++++++
 33 files changed, 1896 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 8a356b2..1626e79 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -346,6 +346,9 @@ public final class ConfigConstants {
 	 */
 	public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path";
 
+	/** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */
+	public static final String JOB_MANAGER_WEB_SUBMISSION_KEY = "jobmanager.web.submit.enable";
+
 	/** Flag to disable checkpoint stats. */
 	public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = "jobmanager.web.checkpoints.disable";
 
@@ -690,17 +693,16 @@ public final class ConfigConstants {
 	
 	// ------------------------- JobManager Web Frontend ----------------------
 	
-	/**
-	 * The config key for the port of the JobManager web frontend.
-	 * Setting this value to {@code -1} disables the web frontend.
-	 */
+	/** The config key for the port of the JobManager web frontend.
+	 * Setting this value to {@code -1} disables the web frontend. */
 	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
 	
-	/**
-	 * The default number of archived jobs for the jobmanager
-	 */
+	/** The default number of archived jobs for the jobmanager */
 	public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5;
 
+	/** By default, submitting jobs from the web-frontend is allowed. */
+	public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMISSION = true;
+
 	/** Default flag to disable checkpoint stats. */
 	public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 08adf04..d1d9b8a 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -66,6 +66,10 @@ parallelism.default: 1
 
 jobmanager.web.port: 8081
 
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#jobmanager.web.submit.enable: false
 
 # The port uder which the standalone web client
 # (for job upload and submit) listens.

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 52283b9..c49900d 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -57,6 +57,11 @@ under the License.
 			<artifactId>flink-runtime</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 
 		<!-- ===================================================
 						Dependencies for the Web Server

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
new file mode 100644
index 0000000..f0f3c92
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+/*****************************************************************************
+ * This code is based on the "HttpUploadServerHandler" from the
+ * Netty project's HTTP server example.
+ *
+ * See http://netty.io and
+ * https://github.com/netty/netty/blob/netty-4.0.31.Final/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java
+ *****************************************************************************/
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.QueryStringEncoder;
+import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import io.netty.handler.codec.http.multipart.DiskFileUpload;
+import io.netty.handler.codec.http.multipart.HttpDataFactory;
+import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.EndOfDataDecoderException;
+import io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
+
+import java.io.File;
+import java.util.UUID;
+
+/**
+ * Simple code which handles all HTTP requests from the user, and passes them to the Router
+ * handler directly if they do not involve file upload requests.
+ * If a file is required to be uploaded, it handles the upload, and in the http request to the
+ * next handler, passes the name of the file to the next handler.
+ */
+public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {
+
+	private HttpRequest request;
+
+	private boolean readingChunks;
+
+	private static final HttpDataFactory factory = new DefaultHttpDataFactory(true); // use disk
+
+	private String requestPath;
+
+	private HttpPostRequestDecoder decoder;
+
+	private final File uploadDir;
+
+	/**
+	 * The directory where files should be uploaded.
+	 */
+	public HttpRequestHandler(File uploadDir) {
+		this.uploadDir = uploadDir;
+	}
+
+	@Override
+	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+		if (decoder != null) {
+			decoder.cleanFiles();
+		}
+	}
+
+	@Override
+	public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
+		if (msg instanceof HttpRequest) {
+			request = (HttpRequest) msg;
+			requestPath = new QueryStringDecoder(request.getUri()).path();
+			if (request.getMethod() != HttpMethod.POST) {
+				ctx.fireChannelRead(request);
+			} else {
+				// try to decode the posted data now.
+				decoder = new HttpPostRequestDecoder(factory, request);
+				readingChunks = HttpHeaders.isTransferEncodingChunked(request);
+				if (readingChunks) {
+					readingChunks = true;
+				}
+			}
+		} else if (decoder != null && msg instanceof HttpContent) {
+			// New chunk is received
+			HttpContent chunk = (HttpContent) msg;
+			decoder.offer(chunk);
+			try {
+				while (decoder.hasNext()) {
+					InterfaceHttpData data = decoder.next();
+					// IF SOMETHING EVER NEEDS POST PARAMETERS, THIS WILL BE THE PLACE TO HANDLE IT
+					// all fields values will be passed with type Attribute.
+					if (data.getHttpDataType() == HttpDataType.FileUpload) {
+						DiskFileUpload file = (DiskFileUpload) data;
+						if (file.isCompleted()) {
+							String newName = UUID.randomUUID() + "_" + file.getFilename();
+							file.renameTo(new File(uploadDir, newName));
+							QueryStringEncoder encoder = new QueryStringEncoder(requestPath);
+							encoder.addParam("file", newName);
+							request.setUri(encoder.toString());
+						}
+					}
+					data.release();
+				}
+			} catch (EndOfDataDecoderException e) {
+				//
+			}
+			if (chunk instanceof LastHttpContent) {
+				readingChunks = false;
+				decoder.destroy();
+				decoder = null;
+				ctx.fireChannelRead(request);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
new file mode 100644
index 0000000..2618b21
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * This is the last handler in the pipeline and logs all error messages.
+ */
+@ChannelHandler.Sharable
+public class PipelineErrorHandler extends SimpleChannelInboundHandler<Object> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PipelineErrorHandler.class);
+
+	@Override
+	protected void channelRead0(ChannelHandlerContext ctx, Object message) {
+		// we can't deal with this message. No one in the pipeline handled it. Log it.
+		LOG.debug("Unknown message received: {}", message);
+		sendError(ctx, "Unknown message received.");
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+		LOG.debug("Unhandled exception: {}", cause);
+		sendError(ctx, cause.getMessage());
+	}
+
+	private void sendError(ChannelHandlerContext ctx, String error) {
+		DefaultFullHttpResponse response;
+		StringWriter writer = new StringWriter();
+		try {
+			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+			gen.writeStartObject();
+			gen.writeStringField("error", error);
+			gen.writeEndObject();
+			gen.close();
+			// send a bad request status code.
+			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+					HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(writer.toString().getBytes()));
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+		} catch (IOException e) {
+			// seriously? Let's just send some plain text.
+			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+					HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(error.getBytes()));
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+		}
+		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+		if (ctx.channel().isActive()) {
+			ctx.writeAndFlush(response);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index e5815c7..c7d1e51 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -40,7 +40,10 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -67,6 +70,8 @@ public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {
 
 	private String localJobManagerAddress;
 
+	public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";
+
 	public RuntimeMonitorHandler(
 			RequestHandler handler,
 			JobManagerRetriever retriever,
@@ -113,7 +118,16 @@ public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {
 		DefaultFullHttpResponse response;
 
 		try {
-			String result = handler.handleRequest(routed.pathParams(), jobManager);
+			// we only pass the first element in the list to the handlers.
+			Map<String, String> queryParams = new HashMap<>();
+			for (String key : routed.queryParams().keySet()) {
+				queryParams.put(key, routed.queryParam(key));
+			}
+
+			InetSocketAddress address = (InetSocketAddress) ctx.channel().localAddress();
+			queryParams.put(WEB_MONITOR_ADDRESS_KEY, address.getHostName() + ":" + address.getPort());
+
+			String result = handler.handleRequest(routed.pathParams(), queryParams, jobManager);
 			byte[] bytes = result.getBytes(ENCODING);
 
 			response = new DefaultFullHttpResponse(

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 2e3a792..4117800 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -112,6 +112,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private final File webRootDir;
 
+	private final File uploadDir;
+
 	private AtomicBoolean isShutdown = new AtomicBoolean();
 
 
@@ -119,7 +121,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			Configuration config,
 			LeaderRetrievalService leaderRetrievalService,
 			ActorSystem actorSystem) throws IOException, InterruptedException {
-		
+
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
 
 		final WebMonitorConfig cfg = new WebMonitorConfig(config);
@@ -127,10 +129,24 @@ public class WebRuntimeMonitor implements WebMonitor {
 		// create an empty directory in temp for the web server
 		String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
 		webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
+
+		// create storage for uploads
+		fileName = String.format("flink-web-upload-%s", UUID.randomUUID().toString());
+		uploadDir = new File(System.getProperty("java.io.tmpdir"), fileName);
+		if (!uploadDir.mkdir() || !uploadDir.canWrite()) {
+			throw new IOException("Unable to create temporary directory to support jar uploads.");
+		}
+
 		LOG.info("Using directory {} for the web interface files", webRootDir);
+		LOG.info("Using directory {} for web frontend JAR file uploads", uploadDir);
 
 		final WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(config);
-		
+
+		final boolean webSubmitAllow = config.getBoolean(
+				ConfigConstants.JOB_MANAGER_WEB_SUBMISSION_KEY,
+				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMISSION
+		);
+
 		// port configuration
 		int configuredPort = cfg.getWebFrontendPort();
 		if (configuredPort < 0) {
@@ -143,7 +159,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		retriever = new JobManagerRetriever(this, actorSystem, lookupTimeout, timeout);
 
 		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();
-		
+
 		router = new Router()
 			// config how to interact with this web server
 			.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
@@ -193,10 +209,34 @@ public class WebRuntimeMonitor implements WebMonitor {
 			// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
 			.GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler()))
 			// DELETE is the preferred way of cancelling a job (Rest-conform)
-			.DELETE("/jobs/:jobid", handler(new JobCancellationHandler()))
+			.DELETE("/jobs/:jobid", handler(new JobCancellationHandler()));
+
+		if (webSubmitAllow) {
+			router
+				// fetch the list of uploaded jars.
+				.GET("/jars", handler(new JarListHandler(uploadDir)))
+
+				// get plan for an uploaded jar
+				.GET("/jars/:jarid/plan", handler(new JarPlanHandler(uploadDir)))
+
+				// run a jar
+				.POST("/jars/:jarid/run", handler(new JarRunHandler(uploadDir, timeout)))
+
+				// upload a jar
+				.POST("/jars/upload", handler(new JarUploadHandler(uploadDir)))
+
+				// delete an uploaded jar from submission interface
+				.DELETE("/jars/:jarid", handler(new JarDeleteHandler(uploadDir)));
+		} else {
+			router
+				// send an Access Denied message (sort of)
+				// Every other GET request will go to the File Server, which will not provide
+				// access to the jar directory anyway, because it doesn't exist in webRootDir.
+				.GET("/jars", handler(new JarAccessDeniedHandler()));
+		}
 
-			// this handler serves all the static contents
-			.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir));
+		// this handler serves all the static contents
+		router.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir));
 
 		synchronized (startupShutdownLock) {
 
@@ -224,9 +264,9 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 					ch.pipeline()
 							.addLast(new HttpServerCodec())
-							.addLast(new HttpObjectAggregator(65536))
-							.addLast(new ChunkedWriteHandler())
-							.addLast(handler.name(), handler);
+							.addLast(new HttpRequestHandler(uploadDir))
+							.addLast(handler.name(), handler)
+							.addLast(new PipelineErrorHandler());
 				}
 			};
 
@@ -298,10 +338,17 @@ public class WebRuntimeMonitor implements WebMonitor {
 			return;
 		}
 		try {
-			LOG.info("Removing web root dir {}", webRootDir);
+			LOG.info("Removing web dashboard root cache directory {}", webRootDir);
 			FileUtils.deleteDirectory(webRootDir);
 		} catch (Throwable t) {
-			LOG.warn("Error while deleting web root dir {}", webRootDir, t);
+			LOG.warn("Error while deleting web root directory {}", webRootDir, t);
+		}
+
+		try {
+			LOG.info("Removing web dashboard jar upload directory {}", uploadDir);
+			FileUtils.deleteDirectory(uploadDir);
+		} catch (Throwable t) {
+			LOG.warn("Error while deleting web storage dir {}", uploadDir, t);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index 4df387a..16cfb1a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -39,8 +39,8 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 	}
 
 	@Override
-	public final String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
-		String jidString = params.get("jobid");
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		String jidString = pathParams.get("jobid");
 		if (jidString == null) {
 			throw new RuntimeException("JobId parameter missing");
 		}
@@ -58,7 +58,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 			throw new NotFoundException("Could not find job with id " + jid);
 		}
 		
-		return handleRequest(eg, params);
+		return handleRequest(eg, pathParams);
 	}
 	
 	public abstract String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index a1f6a76..ae43a00 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -49,7 +49,7 @@ public class ClusterOverviewHandler implements RequestHandler, RequestHandler.Js
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		// we need no parameters, get all requests
 		try {
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 06fe34b..41b118c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -53,7 +53,7 @@ public class CurrentJobIdsHandler implements RequestHandler, RequestHandler.Json
 	}
 	
 	@Override
-	public String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		// we need no parameters, get all requests
 		try {
 			if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 3ca0420..d9e0096 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -55,7 +55,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			if (jobManager != null) {
 				Future<Object> future = jobManager.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 4027782..71816df 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -67,7 +67,7 @@ public class DashboardConfigHandler implements RequestHandler, RequestHandler.Js
 	}
 	
 	@Override
-	public String handleRequest(Map<String, String> params, ActorGateway jobManagerGateway) {
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		return this.configString;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
new file mode 100644
index 0000000..024433d
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.instance.ActorGateway;
+
+import java.util.Map;
+
+public class JarAccessDeniedHandler implements RequestHandler, RequestHandler.JsonResponse {
+
+	private static final String ERROR_MESSAGE = "{\"error\": \"Web submission interface is not " +
+			"available for this cluster. Please check the config key jobmanager.web.submit.enable.\"}";
+
+	@Override
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		return ERROR_MESSAGE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
new file mode 100644
index 0000000..e185003
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract handler for fetching plan for a jar or running a jar.
+ */
+public abstract class JarActionHandler implements RequestHandler, RequestHandler.JsonResponse {
+
+	private final File jarDir;
+
+	private static final PrintStream nullStream = new PrintStream(new NullPrintStream());
+
+	public JarActionHandler(File jarDirectory) {
+		jarDir = jarDirectory;
+	}
+
+	protected Tuple2<JobGraph, ClassLoader> getJobGraphAndClassLoader(Map<String, String> pathParams, Map<String, String> queryParams) throws Exception {
+		// generate the graph
+		JobGraph graph = null;
+		final String file = pathParams.get("jarid");
+		if (file == null) {
+			throw new IllegalArgumentException("No jarid was provided.");
+		}
+
+		final List<String> programArgs;
+		// parse required params
+		String param = queryParams.get("program-args");
+		programArgs = (param != null && !param.equals("")) ? tokenizeArguments(param) : new ArrayList<String>();
+
+		final String entryClassOpt = queryParams.get("entry-class");
+		final String parallelismOpt = queryParams.get("parallelism");
+
+		int parallelism = 1;
+		String entryClass = null;
+
+		if (parallelismOpt != null && !parallelismOpt.equals("")) {
+			parallelism = Integer.parseInt(parallelismOpt);
+			if (parallelism < 1) {
+				throw new IllegalArgumentException("Parallelism must be a positive number.");
+			}
+		}
+
+		// get entry class
+		if (entryClassOpt != null && !entryClassOpt.equals("")) {
+			entryClass = entryClassOpt;
+		}
+
+		PackagedProgram program = new PackagedProgram(new File(jarDir, file), entryClass,
+				programArgs.toArray(new String[programArgs.size()]));
+		ClassLoader classLoader = program.getUserCodeClassLoader();
+		Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
+		PrintStream out = System.out;
+		PrintStream err = System.err;
+		System.setOut(nullStream);
+		System.setErr(nullStream);
+		FlinkPlan plan = Client.getOptimizedPlan(compiler, program, parallelism);
+		System.setOut(out);
+		System.setErr(err);
+		if (plan instanceof StreamingPlan) {
+			graph = ((StreamingPlan) plan).getJobGraph();
+		} else if (plan instanceof OptimizedPlan) {
+			graph = new JobGraphGenerator().compileJobGraph((OptimizedPlan) plan);
+		}
+		if (graph == null) {
+			throw new CompilerException("A valid job graph couldn't be generated for the jar.");
+		}
+		for (URL jar : program.getAllLibraries()) {
+			try {
+				graph.addJar(new Path(jar.toURI()));
+			} catch (URISyntaxException e) {
+				throw new ProgramInvocationException("Invalid jar path. Unexpected error. :(");
+			}
+		}
+		return Tuple2.of(graph, classLoader);
+	}
+
+	/**
+	 * Utility method that takes the given arguments, splits them at the whitespaces (space and tab) and
+	 * turns them into an array of Strings. Other than the <tt>StringTokenizer</tt>, this method
+	 * takes care of quotes, such that quoted passages end up being one string.
+	 *
+	 * @param args
+	 *        The string to be split.
+	 * @return The array of split strings.
+	 */
+	private static List<String> tokenizeArguments(String args) {
+		List<String> list = new ArrayList<String>();
+		StringBuilder curr = new StringBuilder();
+
+		int pos = 0;
+		boolean quoted = false;
+
+		while (pos < args.length()) {
+			char c = args.charAt(pos);
+			if ((c == ' ' || c == '\t') && !quoted) {
+				if (curr.length() > 0) {
+					list.add(curr.toString());
+					curr.setLength(0);
+				}
+			} else if (c == '"') {
+				quoted = !quoted;
+			} else {
+				curr.append(c);
+			}
+
+			pos++;
+		}
+
+		if (quoted) {
+			throw new IllegalArgumentException("Unterminated quoted string.");
+		}
+
+		if (curr.length() > 0) {
+			list.add(curr.toString());
+		}
+
+		return list;
+	}
+
+	protected String sendError(Exception e) throws Exception {
+		StringWriter sw = new StringWriter();
+		PrintWriter p = new PrintWriter(sw);
+		if (e instanceof ProgramInvocationException || e instanceof CompilerException || e instanceof IllegalArgumentException) {
+			p.println(e.getClass().getSimpleName() + ((e.getMessage() != null) ? ": " + e.getMessage() : ""));
+			Throwable cause = e.getCause();
+			if (cause != null) {
+				p.println(cause.toString());
+			} else {
+				cause = e;
+			}
+
+			for (StackTraceElement traceElement: cause.getStackTrace()) {
+				p.println("\tat " + traceElement);
+				if (traceElement.getMethodName().equals("handleRequest")) {
+					break;
+				}
+			}
+		} else {
+			// if not something we expected, dump the entire stack trace.
+			e.printStackTrace(p);
+		}
+		p.close();
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+		gen.writeStartObject();
+		gen.writeStringField("error", sw.toString());
+		gen.writeEndObject();
+		gen.close();
+		return writer.toString();
+	}
+
+	private static final class NullPrintStream extends OutputStream {
+		public void write(int x) {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
new file mode 100644
index 0000000..770e7e9
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.instance.ActorGateway;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Handles requests for deletion of jars.
+ */
+public class JarDeleteHandler implements RequestHandler, RequestHandler.JsonResponse {
+
+	private final File jarDir;
+
+	public JarDeleteHandler(File jarDirectory) {
+		jarDir = jarDirectory;
+	}
+
+	@Override
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		final String file = pathParams.get("jarid");
+		try {
+			File[] list = jarDir.listFiles(new FilenameFilter() {
+				@Override
+				public boolean accept(File dir, String name) {
+					return name.equals(file);
+				}
+			});
+			boolean success = false;
+			for (File f: list) {
+				// although next to impossible for multiple files, we still delete them.
+				success = success || f.delete();
+			}
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+			gen.writeStartObject();
+			if (!success) {
+				// this seems to always fail on Windows.
+				gen.writeStringField("error", "The requested jar couldn't be deleted. Please try again.");
+			}
+			gen.writeEndObject();
+			gen.close();
+			return writer.toString();
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Failed to delete jar id " + pathParams.get("jarid") + ": " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
new file mode 100644
index 0000000..980ed13
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.UUID;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+public class JarListHandler implements RequestHandler, RequestHandler.JsonResponse {
+
+	private final File jarDir;
+
+	public  JarListHandler(File jarDirectory) {
+		jarDir = jarDirectory;
+	}
+
+	@Override
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		try {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+			gen.writeStartObject();
+			gen.writeStringField("address", queryParams.get(RuntimeMonitorHandler.WEB_MONITOR_ADDRESS_KEY));
+			gen.writeArrayFieldStart("files");
+
+			File[] list = jarDir.listFiles(new FilenameFilter() {
+				@Override
+				public boolean accept(File dir, String name) {
+					return name.endsWith(".jar");
+				}
+			});
+			for (File f : list) {
+				// separate the uuid and the name parts.
+				String id = f.getName();
+				int startIndex = id.indexOf("_");
+				try {
+					UUID.fromString(id.substring(0, startIndex));
+					if (id.substring(startIndex + 1).equals(".jar")) {
+						throw new Exception();
+					}
+				} catch (Exception e) {
+					continue;
+				}
+				String name = id.substring(startIndex + 1);
+				gen.writeStartObject();
+				gen.writeStringField("id", id);
+				gen.writeStringField("name", name);
+				gen.writeNumberField("uploaded", f.lastModified());
+				gen.writeArrayFieldStart("entry");
+				String[] classes = new String[0];
+				try {
+					JarFile jar = new JarFile(f);
+					Manifest manifest = jar.getManifest();
+					String assemblerClass = null;
+
+					if (manifest != null) {
+						assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
+						if (assemblerClass == null) {
+							assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+						}
+					}
+					if (assemblerClass != null) {
+						classes = assemblerClass.split(",");
+					}
+				} catch (IOException e) {
+					//
+				}
+				// show every entry class that can be loaded later on.
+				PackagedProgram program;
+				for (String clazz : classes) {
+					clazz = clazz.trim();
+					try {
+						program = new PackagedProgram(f, clazz, new String[0]);
+						gen.writeStartObject();
+						gen.writeStringField("name", clazz);
+						String desc = program.getDescription();
+						gen.writeStringField("description", desc == null ? "No description provided" : desc);
+						gen.writeEndObject();
+					} catch (ProgramInvocationException e) {
+						//
+					}
+				}
+				gen.writeEndArray();
+				gen.writeEndObject();
+			}
+			gen.writeEndArray();
+			gen.writeEndObject();
+			gen.close();
+			return writer.toString();
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Failed to fetch jar list: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
new file mode 100644
index 0000000..0d7aa0e
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+
+import java.io.File;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * This handler handles requests to fetch plan for a jar.
+ */
+public class JarPlanHandler extends JarActionHandler {
+
+	public JarPlanHandler(File jarDirectory) {
+		super(jarDirectory);
+	}
+
+	@Override
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		try {
+			JobGraph graph = getJobGraphAndClassLoader(pathParams, queryParams).f0;
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+			gen.writeStartObject();
+			gen.writeFieldName("plan");
+			gen.writeRawValue(JsonPlanGenerator.generatePlan(graph));
+			gen.writeEndObject();
+			gen.close();
+			return writer.toString();
+		} catch (Exception e) {
+			return sendError(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
new file mode 100644
index 0000000..482725f
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * This handler handles requests to fetch plan for a jar.
+ */
+public class JarRunHandler extends JarActionHandler {
+
+	private final FiniteDuration timeout;
+
+	public JarRunHandler(File jarDirectory, FiniteDuration timeout) {
+		super(jarDirectory);
+		this.timeout = timeout;
+	}
+
+	@Override
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		try {
+			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(pathParams, queryParams);
+			try {
+				JobClient.uploadJarFiles(graph.f0, jobManager, timeout);
+			} catch (IOException e) {
+				throw new ProgramInvocationException("Failed to upload jar files to the job manager", e);
+			}
+
+			try {
+				JobClient.submitJobDetached(jobManager, graph.f0, timeout, graph.f1);
+			} catch (JobExecutionException e) {
+				throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
+			}
+
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+			gen.writeStartObject();
+			gen.writeStringField("jobid", graph.f0.getJobID().toString());
+			gen.writeEndObject();
+			gen.close();
+			return writer.toString();
+		} catch (Exception e) {
+			return sendError(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
new file mode 100644
index 0000000..2889c2a
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.instance.ActorGateway;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * Handles requests for uploading of jars.
+ */
+public class JarUploadHandler implements RequestHandler, RequestHandler.JsonResponse {
+
+	private final File jarDir;
+
+	public JarUploadHandler(File jarDir) {
+		this.jarDir = jarDir;
+	}
+
+	@Override
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		String filename = queryParams.get("file");
+		if(filename != null) {
+			File f = new File(jarDir, filename);
+			if (f.exists()) {
+				if (f.getName().endsWith(".jar")) {
+					return "{}";
+				} else {
+					f.delete();
+					return "{\"error\": \"Only Jar files are allowed.\"}";
+				}
+			}
+		}
+		return "{\"error\": \"Failed to upload the file.\"}";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index 20f28bb..396e96d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -28,9 +28,9 @@ import java.util.Map;
 public class JobCancellationHandler implements RequestHandler, RequestHandler.JsonResponse {
 
 	@Override
-	public String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
-			JobID jobid = new JobID(StringUtils.hexStringToByte(params.get("jobid")));
+			JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
 			if (jobManager != null) {
 				jobManager.tell(new JobManagerMessages.CancelJob(jobid));
 				return "";
@@ -40,7 +40,7 @@ public class JobCancellationHandler implements RequestHandler, RequestHandler.Js
 			}
 		}
 		catch (Exception e) {
-			throw new RuntimeException("Failed to cancel the job with id: "  + params.get("jobid") + e.getMessage(), e);
+			throw new RuntimeException("Failed to cancel the job with id: "  + pathParams.get("jobid") + e.getMessage(), e);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 3a0c774..a692243 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -37,7 +37,7 @@ public class JobManagerConfigHandler implements RequestHandler, RequestHandler.J
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> params, ActorGateway jobManagerGateway) throws Exception {
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 53d1179..9c47544 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -37,5 +37,5 @@ public interface RequestHandler {
 	
 	// --------------------------------------------------------------------------------------------
 
-	String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception;
+	String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index 0becb6a..c44b42b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -48,16 +48,16 @@ public class TaskManagersHandler implements RequestHandler, RequestHandler.JsonR
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		try {
 			if (jobManager != null) {
 				// whether one task manager's metrics are requested, or all task manager, we
 				// return them in an array. This avoids unnecessary code complexity.
 				// If only one task manager is requested, we only fetch one task manager metrics.
 				final List<Instance> instances = new ArrayList<>();
-				if (params.containsKey(TASK_MANAGER_ID_KEY)) {
+				if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
 					try {
-						InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(params.get(TASK_MANAGER_ID_KEY)));
+						InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
 						Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout);
 						TaskManagerInstance instance = (TaskManagerInstance) Await.result(future, timeout);
 						if (instance.instance().nonEmpty()) {
@@ -94,7 +94,7 @@ public class TaskManagersHandler implements RequestHandler, RequestHandler.JsonR
 					gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
 
 					// only send metrics when only one task manager requests them.
-					if (params.containsKey(TASK_MANAGER_ID_KEY)) {
+					if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
 						byte[] report = instance.getLastMetricsReport();
 						if (report != null) {
 							gen.writeFieldName("metrics");

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/web-dashboard/app/index.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/index.jade b/flink-runtime-web/web-dashboard/app/index.jade
index a87faed..ceb5079 100644
--- a/flink-runtime-web/web-dashboard/app/index.jade
+++ b/flink-runtime-web/web-dashboard/app/index.jade
@@ -66,6 +66,11 @@ html(lang='en')
               i.fa.fa-server.fa-fw
               | 
               | Job Manager
+          li
+            a(ui-sref="submit" ui-sref-active='active')
+              i.fa.fa-upload.fa-fw
+              | 
+              | Submit new Job
 
     #content(ng-class="{ 'sidebar-visible': sidebarVisible }")
       div(ui-view='main')

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/web-dashboard/app/partials/submit.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/submit.jade b/flink-runtime-web/web-dashboard/app/partials/submit.jade
new file mode 100644
index 0000000..7fdea88
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/partials/submit.jade
@@ -0,0 +1,127 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+nav.navbar.navbar-default.navbar-fixed-top.navbar-main
+  #fold-button.btn.btn-default.navbar-btn.pull-left(ng-click='showSidebar()')
+    i.fa.fa-navicon
+
+  .navbar-title
+    | Submit new Job
+
+#content-inner(ng-if="noaccess")
+  pre
+    | {{noaccess}}
+#content-inner(ng-if="!noaccess && yarn")
+  pre(ng-if="!address")
+    | Yarn's AM proxy doesn't allow file uploads. Please wait while we fetch an alternate url for you to use
+  pre(ng-if="address")
+    | Yarn's AM proxy doesn't allow file uploads. You can visit&nbsp;
+    a(href="http://{{address}}/#/submit")
+      | here
+    | &nbsp;to access this functionality.
+
+#content-inner(ng-if="!noaccess && jars && !yarn")
+  .panel.panel-default
+    .panel-heading
+      h3.panel-title
+        | Uploaded Jars
+
+    .panel-body
+      table.table
+        thead
+          tr
+            th
+            th Name
+            th Upload Time
+            th Entry Class
+            th
+
+        tbody
+          tr(ng-repeat="jar in jars track by jar.id")
+            td
+              span.icon-wrapper
+                i.show-pointer.fa(ng-click="selectJar(jar.id)" ng-class="state.selected | getJarSelectClass:jar.id")
+            td {{jar.name}}
+            td {{jar.uploaded | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}
+            td
+              div(ng-repeat="entries in jar.entry")
+                span.btn.btn-default.row-button(title="{{entries.description}}" ng-click="loadEntryClass(entries.name)")
+                  | {{entries.name}}
+            td
+              span.icon-wrapper
+                i.show-pointer.fa.fa-remove(title="Delete" ng-click="deleteJar($event, jar.id)")
+
+  table.table.table-no-border#job-submit-table(ng-if="state.selected")
+    tbody
+      tr
+        td.td-large
+          input.form-control(type="text" placeholder="Entry Class" title="Entry Class" ng-model="state['entry-class']")
+        td
+          input.form-control(type="text" placeholder="Parallelism" title="Parallelism" ng-model="state.parallelism")
+        td
+          span.btn.btn-default#fetch-plan(ng-click="getPlan()")
+            | {{state['plan-button']}}
+          | &nbsp;
+          i.fa.fa-spin.fa-spinner(ng-if="state['plan-button'] == 'Getting Plan'")
+      tr
+        td.td-large(colspan="2")
+          input.form-control(type="text" placeholder="Program Arguments" title="Program Arguments" ng-model="state['program-args']")
+        td
+          span.btn.btn-success.btn-sm#job-submit(ng-click="runJob()")
+            | {{state['submit-button']}}
+          | &nbsp;
+          i.fa.fa-spin.fa-spinner(ng-if="state['submit-button'] == 'Submitting'")
+
+  table.table.table-no-border(ng-if="jid")
+    tbody
+      tr
+        td.text-center
+          | Job was successfully submitted. To monitor,&nbsp;
+          a(href="{{'#/jobs/' + jid}}")
+            | click here.
+
+  table.table.table-no-border(ng-if="error")
+    tbody
+      tr
+        td
+          pre
+            | {{error}}
+
+  .canvas-wrapper(ng-if="plan")
+    div.main-canvas(job-plan, plan="plan", jobid="{{plan.jid}}", set-node="changeNode(nodeid)")
+
+  table.table.table-no-border#add-file-table(ng-if="!state.selected")
+    tbody
+      tr
+        td#add-file-button
+          span.btn.btn-default.btn-file(ng-click="clearFiles()")
+            | Add New&nbsp;
+            i.fa.fa-plus
+            input(type="file" onchange="angular.element(this).scope().uploadFiles(this.files)")
+        td#add-file-name(ng-if="uploader.file" title="{{uploader.file.name}}")
+          | {{uploader.file.name}}
+        td#add-file-status(ng-if="uploader.file")
+          span.error-area(ng-if="uploader.error")
+            i
+              | {{uploader.error}}
+          span.btn.btn-success(ng-click="startUpload()" ng-if="uploader.upload")
+            | &nbsp;Upload&nbsp;
+          span(ng-if="uploader.success")
+            | {{uploader.success}}
+          span.btn.btn-progress-bar(ng-if="uploader.progress")
+            span.btn.btn-success.btn-progress(ng-style="{width: uploader.progress + '%'}")
+              | {{uploader.progress}}%

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index 6564ee8..07e13ff 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -193,4 +193,11 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
         templateUrl: "partials/jobmanager/log.html"
         controller: 'JobManagerLogsController'
 
+  .state "submit",
+      url: "/submit"
+      views:
+        main:
+          templateUrl: "partials/submit.html"
+          controller: "JobSubmitController"
+
   $urlRouterProvider.otherwise "/overview"

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
new file mode 100644
index 0000000..f9ece62
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
@@ -0,0 +1,179 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+angular.module('flinkApp')
+
+.controller 'JobSubmitController', ($scope, JobSubmitService, $interval, flinkConfig, $state, $location) ->
+  $scope.yarn = $location.absUrl().indexOf("/proxy/application_") != -1
+  $scope.loadList = () ->
+    JobSubmitService.loadJarList().then (data) ->
+      $scope.address = data.address
+      $scope.noaccess = data.error
+      $scope.jars = data.files
+
+  $scope.defaultState = () ->
+    $scope.plan = null
+    $scope.error = null
+    $scope.state = {
+      selected: null,
+      parallelism: "",
+      'entry-class': "",
+      'program-args': "",
+      'plan-button': "Show Plan",
+      'submit-button': "Submit",
+      'action-time': 0
+    }
+
+  $scope.defaultState()
+  $scope.uploader = {}
+  $scope.loadList()
+
+  refresh = $interval ->
+    $scope.loadList()
+  , flinkConfig["refresh-interval"]
+
+  $scope.$on '$destroy', ->
+    $interval.cancel(refresh)
+
+  $scope.selectJar = (id) ->
+    if $scope.state.selected == id
+      $scope.defaultState()
+    else
+      $scope.defaultState()
+      $scope.state.selected = id
+
+  $scope.deleteJar = (event, id) ->
+    if $scope.state.selected == id
+      $scope.defaultState()
+    angular.element(event.currentTarget).removeClass("fa-remove").addClass("fa-spin fa-spinner")
+    JobSubmitService.deleteJar(id).then (data) ->
+      angular.element(event.currentTarget).removeClass("fa-spin fa-spinner").addClass("fa-remove")
+      if data.error?
+        alert(data.error)
+
+  $scope.loadEntryClass = (name) ->
+    $scope.state['entry-class'] = name
+
+  $scope.getPlan = () ->
+    if $scope.state['plan-button'] == "Show Plan"
+      action = new Date().getTime()
+      $scope.state['action-time'] = action
+      $scope.state['submit-button'] = "Submit"
+      $scope.state['plan-button'] = "Getting Plan"
+      $scope.error = null
+      $scope.plan = null
+      JobSubmitService.getPlan(
+        $scope.state.selected, {
+          'entry-class': $scope.state['entry-class'],
+          parallelism: $scope.state.parallelism,
+          'program-args': $scope.state['program-args']
+        }
+      ).then (data) ->
+        if action == $scope.state['action-time']
+          $scope.state['plan-button'] = "Show Plan"
+          $scope.error = data.error
+          $scope.plan = data.plan
+
+  $scope.runJob = () ->
+    if $scope.state['submit-button'] == "Submit"
+      action = new Date().getTime()
+      $scope.state['action-time'] = action
+      $scope.state['submit-button'] = "Submitting"
+      $scope.state['plan-button'] = "Show Plan"
+      $scope.error = null
+      JobSubmitService.runJob(
+        $scope.state.selected, {
+          'entry-class': $scope.state['entry-class'],
+          parallelism: $scope.state.parallelism,
+          'program-args': $scope.state['program-args']
+        }
+      ).then (data) ->
+        if action == $scope.state['action-time']
+          $scope.state['submit-button'] = "Submit"
+          $scope.error = data.error
+          if data.jobid?
+            $state.go("single-job.plan.overview", {jobid: data.jobid})
+
+  # job plan display related stuff
+  $scope.nodeid = null
+  $scope.changeNode = (nodeid) ->
+    if nodeid != $scope.nodeid
+      $scope.nodeid = nodeid
+      $scope.vertex = null
+      $scope.subtasks = null
+      $scope.accumulators = null
+
+      $scope.$broadcast 'reload'
+
+    else
+      $scope.nodeid = null
+      $scope.nodeUnfolded = false
+      $scope.vertex = null
+      $scope.subtasks = null
+      $scope.accumulators = null
+
+  $scope.clearFiles = () ->
+    $scope.uploader = {}
+
+  $scope.uploadFiles = (files) ->
+    # make sure everything is clear again.
+    $scope.uploader = {}
+    if files.length == 1
+      $scope.uploader['file'] = files[0]
+      if files[0].name.endsWith(".jar")
+        $scope.uploader['upload'] = true
+      else
+        $scope.uploader['error'] = "Only Jar files are allowed"
+    else
+      $scope.uploader['error'] = "Did ya forget to select a file?"
+
+  $scope.startUpload = () ->
+    if $scope.uploader['file']?
+      formdata = new FormData()
+      formdata.append("jarfile", $scope.uploader['file'])
+      $scope.uploader['upload'] = false
+      $scope.uploader['success'] = "Initializing upload..."
+      xhr = new XMLHttpRequest()
+      xhr.upload.onprogress = (event) ->
+        $scope.uploader['success'] = null
+        $scope.uploader['progress'] = parseInt(100 * event.loaded / event.total)
+      xhr.upload.onerror = (event) ->
+        $scope.uploader['progress'] = null
+        $scope.uploader['error'] = "An error occurred while uploading your file"
+      xhr.upload.onload = (event) ->
+        $scope.uploader['progress'] = null
+        $scope.uploader['success'] = "Saving..."
+      xhr.onreadystatechange = () ->
+        if xhr.readyState == 4
+          response = JSON.parse(xhr.responseText)
+          if response.error?
+            $scope.uploader['error'] = response.error
+            $scope.uploader['success'] = null
+          else
+            $scope.uploader['success'] = "Uploaded!"
+      xhr.open("POST", "/jars/upload")
+      xhr.send(formdata)
+    else
+      console.log("Unexpected Error. This should not happen")
+
+.filter 'getJarSelectClass', ->
+  (selected, actual) ->
+    if selected == actual
+      "fa-check-square"
+    else
+      "fa-square-o"

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
new file mode 100644
index 0000000..33e57a2
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+angular.module('flinkApp')
+
+.service 'JobSubmitService', ($http, flinkConfig, $q) ->
+
+  @loadJarList = () ->
+    deferred = $q.defer()
+
+    $http.get("jars/")
+    .success (data, status, headers, config) ->
+      deferred.resolve(data)
+
+    deferred.promise
+
+  @deleteJar = (id) ->
+    deferred = $q.defer()
+
+    $http.delete("jars/" + id)
+    .success (data, status, headers, config) ->
+       deferred.resolve(data)
+
+    deferred.promise
+
+  @getPlan = (id, args) ->
+    deferred = $q.defer()
+
+    $http.get("jars/" + id + "/plan", {params: args})
+    .success (data, status, headers, config) ->
+      deferred.resolve(data)
+
+    deferred.promise
+
+  @runJob = (id, args) ->
+    deferred = $q.defer()
+
+    $http.post("jars/" + id + "/run", {}, {params: args})
+    .success (data, status, headers, config) ->
+      deferred.resolve(data)
+
+    deferred.promise
+
+  @

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/web-dashboard/app/styles/index.styl
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/styles/index.styl b/flink-runtime-web/web-dashboard/app/styles/index.styl
index 36d2beb..7cde98a 100644
--- a/flink-runtime-web/web-dashboard/app/styles/index.styl
+++ b/flink-runtime-web/web-dashboard/app/styles/index.styl
@@ -177,12 +177,88 @@ sidebar-width-small = 160px
       
     td
       position: relative
-      
+
+  &.table-no-border
+    th
+      border-top: none !important
+    td
+      border-top: none !important
+
+  &#job-submit-table
+    table-layout: fixed
+    white-space: nowrap
+    td.td-large
+      width: 40%
+    td
+      width: 15%
+      > input
+        height: 28px
+        font-size: 14px
+      > span.btn
+        padding: 2px 4px
+        font-size: 14px
+
+  &#add-file-table
+    table-layout: fixed
+
+    span.btn
+      position: relative
+      overflow: hidden
+      padding: 2px 4px
+      font-size: 14px
+      border-radius: 2px
+      margin-top: -3px
+
+    td#add-file-button
+      width: 100px
+
+      input[type=file]
+        position: absolute
+        top: 0
+        right: 0
+        min-width: 100%
+        min-height: 100%
+        filter: unquote("alpha(opacity=0)")
+        opacity: 0
+        outline: none
+        cursor: inherit
+        display: block
+
+    td#add-file-name
+      width: 250px
+      text-overflow: ellipsis
+      overflow: hidden
+      white-space: nowrap
+
+    td#add-file-status
+      width: 100%
+
+      span.btn-progress-bar
+        padding: 0 !important
+        width: 100%
+        background-color: #f5f5f5
+        text-align: left
+      span.btn-progress
+        padding: 2px 2px
+        font-size: 10px
+
+
+  span.error-area
+    color: red
+
+  span.row-button
+    padding: 1px 2px
+    margin: 0
+    border: none !important
+
   .small-label
     text-transform: uppercase
     font-size: 13px
     color: #999999
 
+span.icon-wrapper
+  width: 1.2em
+  display: inline-block
 
 .panel
   &.panel-dashboard
@@ -330,6 +406,13 @@ pre.exception
   padding: 0
   margin: 0
 
+pre
+  white-space: pre-wrap
+  white-space: -moz-pre-wrap
+  white-space: -pre-wrap
+  white-space: -o-pre-wrap
+  word-wrap: break-word
+
 .nav-tabs
   &.tabs-vertical
     // float: right
@@ -427,3 +510,11 @@ livechart
 .show-pointer
   cursor: pointer
 
+#fetch-plan
+  width: 100px
+
+#job-submit
+  width: 100px
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/web-dashboard/web/css/index.css
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/css/index.css b/flink-runtime-web/web-dashboard/web/css/index.css
index 5a7c44f..52a5631 100644
--- a/flink-runtime-web/web-dashboard/web/css/index.css
+++ b/flink-runtime-web/web-dashboard/web/css/index.css
@@ -196,11 +196,96 @@
 .table.table-activable td {
   position: relative;
 }
+.table.table-no-border th {
+  border-top: none !important;
+}
+.table.table-no-border td {
+  border-top: none !important;
+}
+.table#job-submit-table {
+  table-layout: fixed;
+  white-space: nowrap;
+}
+.table#job-submit-table td.td-large {
+  width: 40%;
+}
+.table#job-submit-table td {
+  width: 15%;
+}
+.table#job-submit-table td > input {
+  height: 28px;
+  font-size: 14px;
+}
+.table#job-submit-table td > span.btn {
+  padding: 2px 4px;
+  font-size: 14px;
+}
+.table#add-file-table {
+  table-layout: fixed;
+}
+.table#add-file-table span.btn {
+  position: relative;
+  overflow: hidden;
+  padding: 2px 4px;
+  font-size: 14px;
+  -webkit-border-radius: 2px;
+  border-radius: 2px;
+  margin-top: -3px;
+}
+.table#add-file-table td#add-file-button {
+  width: 100px;
+}
+.table#add-file-table td#add-file-button input[type=file] {
+  position: absolute;
+  top: 0;
+  right: 0;
+  min-width: 100%;
+  min-height: 100%;
+  filter: alpha(opacity=0);
+  opacity: 0;
+  -ms-filter: "progid:DXImageTransform.Microsoft.Alpha(Opacity=0)";
+  filter: alpha(opacity=0);
+  outline: none;
+  cursor: inherit;
+  display: block;
+}
+.table#add-file-table td#add-file-name {
+  width: 250px;
+  -o-text-overflow: ellipsis;
+  text-overflow: ellipsis;
+  overflow: hidden;
+  white-space: nowrap;
+}
+.table#add-file-table td#add-file-status {
+  width: 100%;
+}
+.table#add-file-table td#add-file-status span.btn-progress-bar {
+  padding: 0 !important;
+  width: 100%;
+  background-color: #f5f5f5;
+  text-align: left;
+}
+.table#add-file-table td#add-file-status span.btn-progress {
+  padding: 2px 2px;
+  font-size: 10px;
+}
+.table span.error-area {
+  color: #f00;
+}
+.table span.row-button {
+  padding: 1px 2px;
+  margin: 0;
+  border: none !important;
+}
 .table .small-label {
   text-transform: uppercase;
   font-size: 13px;
   color: #999;
 }
+span.icon-wrapper {
+  width: 1.2em;
+  display: inline-block;
+}
 .panel.panel-dashboard .huge {
   font-size: 28px;
 }
@@ -390,6 +475,13 @@ pre.exception {
   padding: 0;
   margin: 0;
 }
+pre {
+  white-space: pre-wrap;
+  white-space: -moz-pre-wrap;
+  white-space: -pre-wrap;
+  white-space: -o-pre-wrap;
+  word-wrap: break-word;
+}
 .nav-tabs.tabs-vertical {
   position: absolute;
   left: 0;
@@ -603,3 +695,9 @@ svg.graph .node-label {
 .show-pointer {
   cursor: pointer;
 }
+#fetch-plan {
+  width: 100px;
+}
+#job-submit {
+  width: 100px;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/72f9a6c6/flink-runtime-web/web-dashboard/web/index.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/index.html b/flink-runtime-web/web-dashboard/web/index.html
index a6df249..e3055b3 100644
--- a/flink-runtime-web/web-dashboard/web/index.html
+++ b/flink-runtime-web/web-dashboard/web/index.html
@@ -45,6 +45,8 @@ limitations under the License.
               Task Managers</a></li>
           <li><a ui-sref="jobmanager.config" ui-sref-active="active"><i class="fa fa-server fa-fw"></i> 
               Job Manager</a></li>
+          <li><a ui-sref="submit" ui-sref-active="active"><i class="fa fa-upload fa-fw"></i> 
+              Submit new Job</a></li>
         </ul>
       </div>
     </div>


Mime
View raw message