Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 26D7A17777 for ; Thu, 5 Feb 2015 22:48:58 +0000 (UTC) Received: (qmail 92265 invoked by uid 500); 5 Feb 2015 22:48:58 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 92229 invoked by uid 500); 5 Feb 2015 22:48:58 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 92220 invoked by uid 99); 5 Feb 2015 22:48:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2015 22:48:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CBE68E01D9; Thu, 5 Feb 2015 22:48:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2018. App Tracking and History URL should point to the Tez UI. (Prakash Ramachandran via hitesh) Date: Thu, 5 Feb 2015 22:48:57 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.6 87a99d022 -> e0d3599ce TEZ-2018. App Tracking and History URL should point to the Tez UI. (Prakash Ramachandran via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e0d3599c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e0d3599c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e0d3599c Branch: refs/heads/branch-0.6 Commit: e0d3599ce1499e85aebdf0163a2940964c830a4a Parents: 87a99d0 Author: Hitesh Shah Authored: Thu Feb 5 14:48:44 2015 -0800 Committer: Hitesh Shah Committed: Thu Feb 5 14:48:44 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + pom.xml | 2 +- .../apache/tez/dag/api/TezConfiguration.java | 37 ++ tez-dag/pom.xml | 22 ++ .../java/org/apache/tez/dag/app/AppContext.java | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 29 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 52 ++- .../apache/tez/dag/app/web/AMWebController.java | 354 +++++++++++++++++++ .../apache/tez/dag/app/web/WebUIService.java | 161 +++++++++ .../tez/dag/app/rm/TestContainerReuse.java | 7 + .../app/rm/TestTaskSchedulerEventHandler.java | 31 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 2 +- .../tez/dag/app/web/TestAMWebController.java | 169 +++++++++ 13 files changed, 855 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0e97974..2caa2c3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2018. App Tracking and History URL should point to the Tez UI. TEZ-2035. Make timeline server putDomain exceptions non-fatal - work-around TEZ-1929. pre-empted tasks should be marked as killed instead of failed TEZ-2017. TEZ UI - Dag view throwing error whild re-displaying additionals in some dags. http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 99ef06b..b87696e 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ true ${user.home}/clover.license 2.6.0 - 7.6.16.v20140903 + 6.1.26 apache.snapshots.https Apache Development Snapshot Repository https://repository.apache.org/content/repositories/snapshots http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 9a90209..9d9aa86 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -967,4 +967,41 @@ public class TezConfiguration extends Configuration { + "allow.disabled.timeline-domains"; public static final boolean TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT = false; + /** + * String value + * Tez UI URL template for the application. + * Expert level setting. + * + * The AM will redirect the user to the Tez UI via this url. Template supports the following + * parameters to be replaced with the actual runtime information: + * + * __APPLICATION_ID__ : Replaces this with application ID + * __HISTORY_URL_BASE__: replaces this with TEZ_HISTORY_URL_BASE + * + * For example, "http://uihost:9001/#/tez-app/__APPLICATION_ID__/ will be replaced to + * http://uihost:9001/#/tez-app/application_1421880306565_0001/ + */ + public static final String TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE = TEZ_AM_PREFIX + + "tez-ui.history-url.template"; + public static final String TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT = + "__HISTORY_URL_BASE__/#/tez-app/__APPLICATION_ID__"; + + /** + * String value + * Tez-UI Url base. This gets replaced in the TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE + * ex http://ui-host:9001 or if its hosted with a prefix http://ui-host:9001/~user + * if the ui is hosted on the default port (80 for http and 443 for https), the port should not + * be specified. + */ + public static final String TEZ_HISTORY_URL_BASE = TEZ_PREFIX + + "tez-ui.history-url.base"; + + /** + * String value + * Allow disabling of the Tez AM webservice. If set to false the Tez-UI wont show progress + * updates for running application. + */ + public static final String TEZ_AM_WEBSERVICE_ENABLE = TEZ_AM_PREFIX + + "tez-ui.webservice.enable"; + public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true; } http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/pom.xml ---------------------------------------------------------------------- diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml index a80d6da..5010b6a 100644 --- a/tez-dag/pom.xml +++ b/tez-dag/pom.xml @@ -77,6 +77,11 @@ hadoop-yarn-client + org.apache.hadoop + hadoop-yarn-server-web-proxy + ${hadoop.version} + + org.apache.commons commons-math3 @@ -131,6 +136,23 @@ org.codehaus.jettison jettison + + org.mortbay.jetty + jetty + compile + ${jetty.version} + + + org.mortbay.jetty + jetty-util + compile + ${jetty.version} + + + javax.servlet + servlet-api + compile + http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index 5cedc56..23ae931 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -93,4 +93,5 @@ public interface AppContext { ACLManager getAMACLManager(); + String getAMUser(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index fc193a5..0a58d2f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -142,6 +142,7 @@ import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; import org.apache.tez.dag.app.rm.node.AMNodeEventType; import org.apache.tez.dag.app.rm.node.AMNodeTracker; +import org.apache.tez.dag.app.web.WebUIService; import org.apache.tez.common.security.ACLManager; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; @@ -221,6 +222,7 @@ public class DAGAppMaster extends AbstractService { private DagEventDispatcher dagEventDispatcher; private VertexEventDispatcher vertexEventDispatcher; private TaskSchedulerEventHandler taskSchedulerEventHandler; + private WebUIService webUIService; private HistoryEventHandler historyEventHandler; private final Map amResources = new HashMap(); private final Map cumulativeAdditionalResources = new HashMap(); @@ -410,9 +412,24 @@ public class DAGAppMaster extends AbstractService { addIfService(speculatorDispatcher, true); dispatcher.register(SpeculatorEventType.class, speculatorDispatcher.getEventHandler()); + + if (enableWebUIService()) { + this.webUIService = new WebUIService(context); + addIfService(webUIService, false); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Web UI Service is not enabled."); + } + } + this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context, - clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher); + clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService); addIfService(taskSchedulerEventHandler, true); + + if (enableWebUIService()) { + addIfServiceDependency(taskSchedulerEventHandler, webUIService); + } + if (isLastAMRetry) { LOG.info("AM will unregister as this is the last attempt" + ", currentAttempt=" + appAttemptID.getAttemptId() @@ -1320,6 +1337,11 @@ public class DAGAppMaster extends AbstractService { } @Override + public String getAMUser() { + return appMasterUgi.getShortUserName(); + } + + @Override public Map getApplicationACLs() { if (getServiceState() != STATE.STARTED) { throw new TezUncheckedException( @@ -2047,4 +2069,9 @@ public class DAGAppMaster extends AbstractService { synchronized void setDAGCounter(int dagCounter) { this.dagCounter.set(dagCounter); } + + private boolean enableWebUIService() { + return amConf.getBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, + TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 625b09e..616690c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -26,8 +26,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; @@ -69,6 +71,7 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated; import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged; import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded; import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded; +import org.apache.tez.dag.app.web.WebUIService; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import com.google.common.base.Preconditions; @@ -79,9 +82,13 @@ public class TaskSchedulerEventHandler extends AbstractService EventHandler { static final Log LOG = LogFactory.getLog(TaskSchedulerEventHandler.class); + static final String APPLICATION_ID_PLACEHOLDER = "__APPLICATION_ID__"; + static final String HISTORY_URL_BASE = "__HISTORY_URL_BASE__"; + protected final AppContext appContext; @SuppressWarnings("rawtypes") private final EventHandler eventHandler; + private final String historyUrl; protected TaskSchedulerService taskScheduler; private DAGAppMaster dagAppMaster; private Map appAcls = null; @@ -94,18 +101,25 @@ public class TaskSchedulerEventHandler extends AbstractService private int cachedNodeCount = -1; private AtomicBoolean shouldUnregisterFlag = new AtomicBoolean(false); + private final WebUIService webUI; BlockingQueue eventQueue = new LinkedBlockingQueue(); @SuppressWarnings("rawtypes") public TaskSchedulerEventHandler(AppContext appContext, - DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher) { + DAGClientServer clientService, EventHandler eventHandler, + ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { super(TaskSchedulerEventHandler.class.getName()); this.appContext = appContext; this.eventHandler = eventHandler; this.clientService = clientService; this.containerSignatureMatcher = containerSignatureMatcher; + this.webUI = webUI; + this.historyUrl = getHistoryUrl(); + if (this.webUI != null) { + this.webUI.setHistoryUrl(this.historyUrl); + } } public Map getApplicationAcls() { @@ -328,8 +342,13 @@ public class TaskSchedulerEventHandler extends AbstractService public synchronized void serviceStart() { InetSocketAddress serviceAddr = clientService.getBindAddress(); dagAppMaster = appContext.getAppMaster(); + // if web service is enabled then set tracking url. else disable it (value = ""). + // the actual url set on the rm web ui will be the proxy url set by WebAppProxyServlet, which + // always try to connect to AM and proxy the response. hence it wont work if the webUIService + // is not enabled. + String trackingUrl = (webUI != null) ? webUI.getURL() : ""; taskScheduler = createTaskScheduler(serviceAddr.getHostName(), - serviceAddr.getPort(), "", appContext); + serviceAddr.getPort(), trackingUrl, appContext); taskScheduler.init(getConfig()); taskScheduler.start(); if (shouldUnregisterFlag.get()) { @@ -514,11 +533,8 @@ public class TaskSchedulerEventHandler extends AbstractService LOG.debug("Setting job diagnostics to " + sb.toString()); } - String historyUrl = ""; - /*String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(), - appContext.getApplicationID()); - LOG.info("History url is " + historyUrl);*/ - + // if history url is set use the same, if historyUrl is set to "" then rm ui disables the + // history url return new AppFinalStatus(finishState, sb.toString(), historyUrl); } @@ -569,4 +585,26 @@ public class TaskSchedulerEventHandler extends AbstractService public boolean hasUnregistered() { return this.taskScheduler.hasUnregistered(); } + + @VisibleForTesting + public String getHistoryUrl() { + Configuration config = this.appContext.getAMConf(); + String historyUrl = ""; + + String loggingClass = config.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ""); + String historyUrlTemplate = config.get(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE, + TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT); + String historyUrlBase = config.get(TezConfiguration.TEZ_HISTORY_URL_BASE, ""); + + + if (loggingClass.equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") && + !historyUrlTemplate.isEmpty() && + !historyUrlBase.isEmpty()) { + historyUrl = historyUrlTemplate + .replaceAll(APPLICATION_ID_PLACEHOLDER, appContext.getApplicationID().toString()) + .replaceAll(HISTORY_URL_BASE, historyUrlBase); + } + + return historyUrl; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java new file mode 100644 index 0000000..b3e404a --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.web; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import com.google.inject.name.Named; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.webapp.Controller; +import org.apache.hadoop.yarn.webapp.MimeType; +import org.apache.hadoop.yarn.webapp.View; +import org.apache.hadoop.yarn.webapp.WebAppException; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezVertexID; + +public class AMWebController extends Controller { + + private final static Log LOG = LogFactory.getLog(AMWebController.class); + + // HTTP CORS Response Headers + static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin"; + static final String ACCESS_CONTROL_ALLOW_CREDENTIALS = "Access-Control-Allow-Credentials"; + static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods"; + static final String ACCESS_CONTROL_ALLOW_HEADERS = "Access-Control-Allow-Headers"; + static final String ACCESS_CONTROL_MAX_AGE = "Access-Control-Max-Age"; + + // CORS default responses. + static final String ALLOWED_METHODS = "GET, HEAD"; + static final String ALLOWED_HEADERS = "X-Requested-With,Content-Type,Accept,Origin"; + + static final String DAG_PROGRESS = "dagProgress"; + static final String VERTEX_PROGRESS = "vertexProgress"; + static final String VERTEX_PROGRESSES = "vertexProgresses"; + + static final int MAX_VERTICES_QUERIED = 100; + + private AppContext appContext; + private String historyUrl; + + @Inject + public AMWebController(RequestContext requestContext, + AppContext appContext, + @Named("TezUIHistoryURL") String historyUrl) { + super(requestContext); + this.appContext = appContext; + this.historyUrl = historyUrl; + } + + @Override + public void index() { + ui(); + } + + public void ui() { + render(StaticAMView.class); + } + + public void main() { + ui(); + } + + public void about() { + renderJSON("Tez AM UI WebServices"); + } + + @VisibleForTesting + public void setCorsHeaders() { + final HttpServletResponse res = response(); + + /* + * ideally the Origin and other CORS headers should be checked and response headers set only + * if it matches the allowed origins. however rm does not forward these headers. + */ + String historyUrlBase = appContext.getAMConf().get(TezConfiguration.TEZ_HISTORY_URL_BASE, ""); + String origin = null; + try { + URL url = new URL(historyUrlBase); + origin = url.getProtocol() + "://" + url.getAuthority(); + } catch (MalformedURLException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Invalid url set for tez history url base: " + historyUrlBase, e); + } + } + + if (origin != null) { + res.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, origin); + } + res.setHeader(ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS); + res.setHeader(ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE.toString()); + res.setHeader(ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS); + res.setHeader(ACCESS_CONTROL_MAX_AGE, "1800"); + } + + void sendErrorResponse(int sc, String msg, Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug(msg, e); + } + + try { + response().sendError(sc, msg); + } catch (IOException e1) { + throw new WebAppException(e); + } + } + + @VisibleForTesting + public boolean hasAccess() { + String remoteUser = request().getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null && !remoteUser.isEmpty()) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + + if (callerUGI != null && appContext.getAMACLManager().checkDAGViewAccess(callerUGI)) { + return false; + } + + return true; + } + + public void getDagProgress() { + + setCorsHeaders(); + + if (!hasAccess()) { + sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " + + request().getRemoteUser(), null); + return; + } + + int dagID; + try { + dagID = getQueryParamInt(WebUIService.DAG_ID); + } catch (NumberFormatException e) { + sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, "Invalid dag id:", e); + return; + } + + DAG currentDAG = appContext.getCurrentDAG(); + + if (currentDAG == null || dagID != currentDAG.getID().getId()) { + sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "Not current Dag: " + dagID, null); + return; + } + + Map result = new HashMap(); + result.put(DAG_PROGRESS, + new ProgressInfo(currentDAG.getID().toString(), currentDAG.getProgress())); + renderJSON(result); + } + + public void getVertexProgress() { + int dagID; + int vertexID; + + setCorsHeaders(); + + if (!hasAccess()) { + sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " + + request().getRemoteUser(), null); + return; + } + + try { + dagID = getQueryParamInt(WebUIService.DAG_ID); + vertexID = getQueryParamInt(WebUIService.VERTEX_ID); + } catch (NumberFormatException e) { + sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, "Invalid dag or vertex id", e); + return; + } + + DAG currentDAG = appContext.getCurrentDAG(); + + if (currentDAG == null || currentDAG.getID().getId() != dagID) { + sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "Not current Dag: " + dagID, null); + return; + } + + final TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), vertexID); + Vertex vertex = currentDAG.getVertex(tezVertexID); + if (vertex == null) { + sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "vertex not found: " + vertexID, null); + return; + } + + Map result = new HashMap(); + result.put(VERTEX_PROGRESS, new ProgressInfo(tezVertexID.toString(), vertex.getProgress())); + renderJSON(result); + } + + + Collection getVerticesByIdx(DAG dag, Collection indexes) { + Collection vertices = new ArrayList(indexes.size()); + final TezDAGID tezDAGID = dag.getID(); + + for (Integer idx : indexes) { + final TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, idx); + if (tezVertexID == null) { + continue; + } + final Vertex vertex = dag.getVertex(tezVertexID); + if (vertex != null) { + vertices.add(vertex); + } + } + + return vertices; + } + + int getQueryParamInt(String name) throws NumberFormatException { + final String valueStr = $(name).trim(); + + return Integer.parseInt(valueStr); + } + + public void getVertexProgresses() { + int dagID; + + setCorsHeaders(); + if (!hasAccess()) { + sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " + + request().getRemoteUser(), null); + return; + } + + List vertexIDs = new ArrayList(); + try { + dagID = getQueryParamInt(WebUIService.DAG_ID); + for (String vertexIDStr : $(WebUIService.VERTEX_ID).trim().split(",", MAX_VERTICES_QUERIED)) { + vertexIDs.add(Integer.parseInt(vertexIDStr)); + } + } catch (NumberFormatException e) { + sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, "Invalid dag or vertices id", e); + return; + } + + DAG currentDAG = appContext.getCurrentDAG(); + if (currentDAG == null || currentDAG.getID().getId() != dagID) { + sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "Not current Dag: " + dagID, null); + return; + } + + Collection vertices; + if (vertexIDs.isEmpty()) { + vertices = currentDAG.getVertices().values(); + } else { + vertices = getVerticesByIdx(currentDAG, vertexIDs); + } + + Collection progresses = new ArrayList(vertices.size()); + for(Vertex vertex : vertices) { + progresses.add(new ProgressInfo(vertex.getVertexId().toString(), vertex.getProgress())); + } + + Map> result = new HashMap>(); + result.put(VERTEX_PROGRESSES, progresses); + renderJSON(result); + } + + @Override + @VisibleForTesting + public void renderJSON(Object object) { + super.renderJSON(object); + } + + public static class StaticAMView extends View { + @Inject + AppContext appContext; + @Inject + @Named("TezUIHistoryURL") String historyUrl; + + @Override + public void render() { + response().setContentType(MimeType.HTML); + PrintWriter pw = writer(); + pw.write(""); + pw.write(""); + pw.write(""); + pw.write("Redirecting to Tez UI"); + pw.write(""); + pw.write(""); + if (historyUrl == null || historyUrl.isEmpty()) { + pw.write("

Tez UI Url is not defined.

" + + "

To enable tracking url pointing to Tez UI, set the config " + + TezConfiguration.TEZ_HISTORY_URL_BASE + " in the tez-site.xml.

"); + } else { + pw.write("

Redirecting to Tez UI

.

If you are not redirected shortly, click" + + "here

" + ); + pw.write(""); + } + pw.write(""); + pw.write(""); + pw.flush(); + } + } + + @VisibleForTesting + static class ProgressInfo { + private String id; + + public float getProgress() { + return progress; + } + + public String getId() { + return id; + } + + private float progress; + + public ProgressInfo(String id, float progress) { + this.id = id; + this.progress = progress; + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java new file mode 100644 index 0000000..44f99c8 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.web; + +import static org.apache.hadoop.yarn.util.StringHelper.pajoin; + +import java.net.InetSocketAddress; + +import com.google.common.base.Preconditions; +import com.google.inject.name.Names; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.webapp.WebApp; +import org.apache.hadoop.yarn.webapp.WebApps; +import org.apache.hadoop.yarn.webapp.YarnWebParams; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.AppContext; + +public class WebUIService extends AbstractService { + private static final String WS_PREFIX = "/ui/ws/v1/tez/"; + public static final String VERTEX_ID = "vertexID"; + public static final String DAG_ID = "dagID"; + + private static final Log LOG = LogFactory.getLog(WebUIService.class); + + private final AppContext context; + private TezAMWebApp tezAMWebApp; + private WebApp webApp; + private int port; + private String historyUrl = ""; + + public WebUIService(AppContext context) { + super(WebUIService.class.getName()); + this.context = context; + this.tezAMWebApp = new TezAMWebApp(context); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (historyUrl == null || historyUrl.isEmpty()) { + LOG.error("Tez UI History URL is not set"); + } else { + LOG.info("Tez UI History URL: " + historyUrl); + } + + if (tezAMWebApp != null) { + this.tezAMWebApp.setHistoryUrl(historyUrl); + } + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (tezAMWebApp != null) { + // use AmIpFilter to restrict connections only from the rm proxy + final Configuration conf = getConfig(); + conf.set("hadoop.http.filter.initializers", + "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); + try { + // Explicitly disabling SSL for the web service. For https we do not want AM users to allow + // access to the keystore file for opening SSL listener. We can trust RM/NM to issue SSL + // certificates, however AM user is not trusted. + this.webApp = WebApps + .$for(this.tezAMWebApp) + .with(conf) + .withHttpPolicy(conf, HttpConfig.Policy.HTTP_ONLY) + .start(this.tezAMWebApp); + this.port = this.webApp.httpServer().getConnectorAddress(0).getPort(); + } catch (Exception e) { + LOG.error("Tez UI WebService failed to start.", e); + throw new TezUncheckedException(e); + } + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.webApp != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping WebApp"); + } + this.webApp.stop(); + } + super.serviceStop(); + } + + public int getPort() { + return this.port; + } + + public String getURL() { + String url = ""; + InetSocketAddress address = webApp.getListenerAddress(); + + if (address != null) { + final String hostName = address.getAddress().getCanonicalHostName(); + final int port = address.getPort(); + url = "http://" + hostName + ":" + port + "/ui/"; + } + + return url; + } + + public String getHistoryUrl() { + return historyUrl; + } + + public void setHistoryUrl(String historyUrl) { + this.historyUrl = historyUrl; + } + + private static class TezAMWebApp extends WebApp implements YarnWebParams { + + private String historyUrl; + AppContext context; + + public TezAMWebApp(AppContext context) { + this.context = context; + } + + public void setHistoryUrl(String historyUrl) { + this.historyUrl = historyUrl; + } + + @Override + public void setup() { + Preconditions.checkArgument(historyUrl != null); + bind(AppContext.class).toInstance(context); + bind(String.class).annotatedWith(Names.named("TezUIHistoryURL")).toInstance(historyUrl); + route("/", AMWebController.class, "ui"); + route("/ui", AMWebController.class, "ui"); + route("/main", AMWebController.class, "main"); + route(WS_PREFIX + "about", AMWebController.class, "about"); + route(WS_PREFIX + pajoin("dagProgress", DAG_ID), AMWebController.class, "getDagProgress"); + route(WS_PREFIX + pajoin("vertexProgress", VERTEX_ID), AMWebController.class, + "getVertexProgress"); + route(WS_PREFIX + pajoin("vertexProgresses", VERTEX_ID, DAG_ID), AMWebController.class, + "getVertexProgresses"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 311e762..d0d3df8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -137,6 +137,7 @@ public class TestContainerReuse { doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); + doReturn(conf).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), @@ -274,6 +275,7 @@ public class TestContainerReuse { doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), @@ -378,6 +380,7 @@ public class TestContainerReuse { doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); @@ -517,6 +520,7 @@ public class TestContainerReuse { doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext); AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); @@ -707,6 +711,7 @@ public class TestContainerReuse { doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), @@ -835,6 +840,7 @@ public class TestContainerReuse { doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); AMContainerMap amContainerMap = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), @@ -956,6 +962,7 @@ public class TestContainerReuse { doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1); AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext); http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index 62618cc..2bb2fcd 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.app.rm; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -31,6 +32,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl; @@ -49,6 +52,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; +import org.apache.tez.dag.app.web.WebUIService; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.junit.Assert; @@ -74,8 +78,8 @@ public class TestTaskSchedulerEventHandler { public MockTaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, - ContainerSignatureMatcher containerSignatureMatcher) { - super(appContext, clientService, eventHandler, containerSignatureMatcher); + ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { + super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI); } @Override @@ -101,19 +105,22 @@ public class TestTaskSchedulerEventHandler { MockTaskSchedulerEventHandler schedulerHandler; TaskSchedulerService mockTaskScheduler; AMContainerMap mockAMContainerMap; + WebUIService mockWebUIService; @Before public void setup() { mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + doReturn(new Configuration(false)).when(mockAppContext).getAMConf(); mockClientService = mock(DAGClientServer.class); mockEventHandler = new TestEventHandler(); mockSigMatcher = mock(ContainerSignatureMatcher.class); mockTaskScheduler = mock(TaskSchedulerService.class); mockAMContainerMap = mock(AMContainerMap.class); + mockWebUIService = mock(WebUIService.class); when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap); when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000)); schedulerHandler = new MockTaskSchedulerEventHandler( - mockAppContext, mockClientService, mockEventHandler, mockSigMatcher); + mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService); } @Test (timeout = 5000) @@ -246,4 +253,22 @@ public class TestTaskSchedulerEventHandler { schedulerHandler.close(); } + @Test (timeout = 5000) + public void testHistoryUrlConf() throws Exception { + Configuration conf = schedulerHandler.appContext.getAMConf(); + + // ensure history url is empty when timeline server is not the logging class + conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9999"); + Assert.assertTrue("".equals(schedulerHandler.getHistoryUrl())); + + // ensure expansion of url happens + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService"); + final ApplicationId mockApplicationId = mock(ApplicationId.class); + doReturn("TEST_APP_ID").when(mockApplicationId).toString(); + doReturn(mockApplicationId).when(mockAppContext).getApplicationID(); + Assert.assertTrue("http://ui-host:9999/#/tez-app/TEST_APP_ID" + .equals(schedulerHandler.getHistoryUrl())); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index b0ea644..bec5320 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -127,7 +127,7 @@ class TestTaskSchedulerHelpers { EventHandler eventHandler, TezAMRMClientAsync amrmClientAsync, ContainerSignatureMatcher containerSignatureMatcher) { - super(appContext, null, eventHandler, containerSignatureMatcher); + super(appContext, null, eventHandler, containerSignatureMatcher, null); this.amrmClientAsync = amrmClientAsync; this.containerSignatureMatcher = containerSignatureMatcher; } http://git-wip-us.apache.org/repos/asf/tez/blob/e0d3599c/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java new file mode 100644 index 0000000..588eb21 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.web; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.webapp.Controller; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezVertexID; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; + +public class TestAMWebController { + AppContext mockAppContext; + Controller.RequestContext mockRequestContext; + HttpServletResponse mockResponse; + HttpServletRequest mockRequst; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + mockAppContext = mock(AppContext.class); + Configuration conf = new Configuration(false); + conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://uihost:9001/foo"); + when(mockAppContext.getAMConf()).thenReturn(conf); + mockRequestContext = mock(Controller.RequestContext.class); + mockResponse = mock(HttpServletResponse.class); + mockRequst = mock(HttpServletRequest.class); + } + + @Test(timeout = 5000) + public void testCorsHeadersAreSet() { + AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext, + "TEST_HISTORY_URL"); + AMWebController spy = spy(amWebController); + doReturn(mockResponse).when(spy).response(); + spy.setCorsHeaders(); + + verify(mockResponse).setHeader("Access-Control-Allow-Origin", "http://uihost:9001"); + verify(mockResponse).setHeader("Access-Control-Allow-Credentials", "true"); + verify(mockResponse).setHeader("Access-Control-Allow-Methods", "GET, HEAD"); + verify(mockResponse).setHeader("Access-Control-Allow-Headers", + "X-Requested-With,Content-Type,Accept,Origin"); + } + + @Test (timeout = 5000) + public void sendErrorResponseIfNoAccess() throws Exception { + AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext, + "TEST_HISTORY_URL"); + AMWebController spy = spy(amWebController); + + doReturn(false).when(spy).hasAccess(); + doNothing().when(spy).setCorsHeaders(); + doReturn(mockResponse).when(spy).response(); + doReturn(mockRequst).when(spy).request(); + doReturn("dummyuser").when(mockRequst).getRemoteUser(); + + spy.getDagProgress(); + verify(mockResponse).sendError(eq(HttpServletResponse.SC_UNAUTHORIZED), anyString()); + reset(mockResponse); + + spy.getVertexProgress(); + verify(mockResponse).sendError(eq(HttpServletResponse.SC_UNAUTHORIZED), anyString()); + reset(mockResponse); + + spy.getVertexProgresses(); + verify(mockResponse).sendError(eq(HttpServletResponse.SC_UNAUTHORIZED), anyString()); + } + + @Captor + ArgumentCaptor> singleResultCaptor; + + @Test (timeout = 5000) + public void testDagProgressResponse() { + AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext, + "TEST_HISTORY_URL"); + AMWebController spy = spy(amWebController); + DAG mockDAG = mock(DAG.class); + + doReturn(true).when(spy).hasAccess(); + doNothing().when(spy).setCorsHeaders(); + doReturn("42").when(spy).$(WebUIService.DAG_ID); + doReturn(mockResponse).when(spy).response(); + doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID(); + doReturn(66.0f).when(mockDAG).getProgress(); + doReturn(mockDAG).when(mockAppContext).getCurrentDAG(); + doNothing().when(spy).renderJSON(any()); + spy.getDagProgress(); + verify(spy).renderJSON(singleResultCaptor.capture()); + + final Map result = singleResultCaptor.getValue(); + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("dagProgress")); + AMWebController.ProgressInfo progressInfo = result.get("dagProgress"); + Assert.assertTrue("dag_1422960590892_0007_42".equals(progressInfo.getId())); + Assert.assertEquals(66.0, progressInfo.getProgress(), 0.1); + } + + @Test (timeout = 5000) + public void testVertexProgressResponse() { + AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext, + "TEST_HISTORY_URL"); + AMWebController spy = spy(amWebController); + DAG mockDAG = mock(DAG.class); + Vertex mockVertex = mock(Vertex.class); + + doReturn(true).when(spy).hasAccess(); + doReturn("42").when(spy).$(WebUIService.DAG_ID); + doReturn("43").when(spy).$(WebUIService.VERTEX_ID); + doReturn(mockResponse).when(spy).response(); + + doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID(); + doReturn(mockDAG).when(mockAppContext).getCurrentDAG(); + doReturn(mockVertex).when(mockDAG).getVertex(any(TezVertexID.class)); + doReturn(66.0f).when(mockVertex).getProgress(); + doNothing().when(spy).renderJSON(any()); + doNothing().when(spy).setCorsHeaders(); + + spy.getVertexProgress(); + verify(spy).renderJSON(singleResultCaptor.capture()); + + final Map result = singleResultCaptor.getValue(); + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("vertexProgress")); + AMWebController.ProgressInfo progressInfo = result.get("vertexProgress"); + Assert.assertTrue("vertex_1422960590892_0007_42_43".equals(progressInfo.getId())); + Assert.assertEquals(66.0f, progressInfo.getProgress(), 0.1); + } +}