flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5870) Make handlers aware of their REST URLs
Date Tue, 28 Feb 2017 13:55:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888051#comment-15888051
] 

ASF GitHub Bot commented on FLINK-5870:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3376#discussion_r103452837
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
---
    @@ -256,117 +256,106 @@ public WebRuntimeMonitor(
     		RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());
     		RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler());
     
    -		router = new Router()
    -			// config how to interact with this web server
    -			.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
    -
    -			// the overview - how many task managers, slots, free slots, ...
    -			.GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))
    -
    -			// job manager configuration
    -			.GET("/jobmanager/config", handler(new JobManagerConfigHandler(config)))
    -
    -			// overview over jobs
    -			.GET("/joboverview", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT,
true, true)))
    -			.GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT,
true, false)))
    -			.GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT,
false, true)))
    -
    -			.GET("/jobs", handler(new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT)))
    -
    -			.GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs, metricFetcher)))
    -			.GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs, metricFetcher)))
    -
    -			.GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs,
metricFetcher)))
    -			.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
    -			.GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs,
metricFetcher)))
    -			.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
    -			.GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(
    -							currentGraphs,
    -							backPressureStatsTracker,
    -							refreshInterval)))
    -			.GET("/jobs/:jobid/vertices/:vertexid/metrics", handler(new JobVertexMetricsHandler(metricFetcher)))
    -			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
    -			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs,
metricFetcher)))
    -			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new
SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher)))
    -			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators",
handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)))
    -
    -			.GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs)))
    -			.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
    -			.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
    -			.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
    -			.GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher)))
    -
    -			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)))
    -			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT,
metricFetcher)))
    -			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", 
    -				new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(),
timeout,
    -					TaskManagerLogHandler.FileMode.LOG, config, enableSSL))
    -			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", 
    -				new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(),
timeout,
    -					TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL))
    -			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new
TaskManagerMetricsHandler(metricFetcher)))
    +		router = new Router();
    +		// config how to interact with this web server
    +		GET(router, new DashboardConfigHandler(cfg.getRefreshInterval()));
    +
    +		// the overview - how many task managers, slots, free slots, ...
    +		GET(router, new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT));
    +
    +		// job manager configuration
    +		GET(router, new JobManagerConfigHandler(config));
    +
    +		// overview over jobs
    +		GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true));
    +		GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false));
    +		GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true));
    +
    +		GET(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT));
    +
    +		GET(router, new JobDetailsHandler(currentGraphs, metricFetcher));
    +
    +		GET(router, new JobVertexDetailsHandler(currentGraphs, metricFetcher));
    +		GET(router, new SubtasksTimesHandler(currentGraphs));
    +		GET(router, new JobVertexTaskManagersHandler(currentGraphs, metricFetcher));
    +		GET(router, new JobVertexAccumulatorsHandler(currentGraphs));
    +		GET(router, new JobVertexBackPressureHandler(currentGraphs,	backPressureStatsTracker,
refreshInterval));
    +		GET(router, new JobVertexMetricsHandler(metricFetcher));
    +		GET(router, new SubtasksAllAccumulatorsHandler(currentGraphs));
    +		GET(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher));
    +		GET(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher));
    +		GET(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs));
    +
    +		GET(router, new JobPlanHandler(currentGraphs));
    +		GET(router, new JobConfigHandler(currentGraphs));
    +		GET(router, new JobExceptionsHandler(currentGraphs));
    +		GET(router, new JobAccumulatorsHandler(currentGraphs));
    +		GET(router, new JobMetricsHandler(metricFetcher));
    +
    +		GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher));
    +		GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(),
timeout,
    +				TaskManagerLogHandler.FileMode.LOG, config, enableSSL));
    +		GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(),
timeout,
    +				TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL));
    +		GET(router, new TaskManagerMetricsHandler(metricFetcher));
     
    +		router
     			// log and stdout
     			.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file
unavailable)") :
     				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout,
logFiles.logFile,
     					enableSSL))
     
     			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout
file unavailable)") :
     				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout,
logFiles.stdOutFile,
    -					enableSSL))
    -
    -			.GET("/jobmanager/metrics", handler(new JobManagerMetricsHandler(metricFetcher)))
    +					enableSSL));
     
    -			// Cancel a job via GET (for proper integration with YARN this has to be performed
via GET)
    -			.GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler()))
    +		GET(router, new JobManagerMetricsHandler(metricFetcher));
     
    -			// DELETE is the preferred way of canceling a job (Rest-conform)
    -			.DELETE("/jobs/:jobid/cancel", handler(new JobCancellationHandler()))
    +		// Cancel a job via GET (for proper integration with YARN this has to be performed
via GET)
    +		GET(router, new JobCancellationHandler());
    +		// DELETE is the preferred way of canceling a job (Rest-conform)
    +		DELETE(router, new JobCancellationHandler());
     
    -			.GET("/jobs/:jobid/cancel-with-savepoint", triggerHandler)
    -			.GET("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory", triggerHandler)
    -			.GET(JobCancellationWithSavepointHandlers.IN_PROGRESS_URL, inProgressHandler)
    +		GET(router, triggerHandler);
    +		GET(router, inProgressHandler);
     
    -			// stop a job via GET (for proper integration with YARN this has to be performed via
GET)
    -			.GET("/jobs/:jobid/yarn-stop", handler(new JobStoppingHandler()))
    -
    -			// DELETE is the preferred way of stopping a job (Rest-conform)
    -			.DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler()));
    +		// stop a job via GET (for proper integration with YARN this has to be performed via
GET)
    +		GET(router, new JobStoppingHandler());
    +		// DELETE is the preferred way of stopping a job (Rest-conform)
    +		DELETE(router, new JobStoppingHandler());
     
     		int maxCachedEntries = config.getInteger(
     				ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
     			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
     		CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
     
     		// Register the checkpoint stats handlers
    -		router
    -			.GET("/jobs/:jobid/checkpoints", handler(new CheckpointStatsHandler(currentGraphs)))
    -			.GET("/jobs/:jobid/checkpoints/config", handler(new CheckpointConfigHandler(currentGraphs)))
    -			.GET("/jobs/:jobid/checkpoints/details/:checkpointid", handler(new CheckpointStatsDetailsHandler(currentGraphs,
cache)))
    -			.GET("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", handler(new
CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache)));
    +		GET(router, new CheckpointStatsHandler(currentGraphs));
    +		GET(router, new CheckpointConfigHandler(currentGraphs));
    +		GET(router, new CheckpointStatsDetailsHandler(currentGraphs, cache));
    +		GET(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache));
     
     		if (webSubmitAllow) {
    -			router
    -				// fetch the list of uploaded jars.
    -				.GET("/jars", handler(new JarListHandler(uploadDir)))
    +			// fetch the list of uploaded jars.
    +			GET(router, new JarListHandler(uploadDir));
     
    -				// get plan for an uploaded jar
    -				.GET("/jars/:jarid/plan", handler(new JarPlanHandler(uploadDir)))
    +			// get plan for an uploaded jar
    +			GET(router, new JarPlanHandler(uploadDir));
     
    -				// run a jar
    -				.POST("/jars/:jarid/run", handler(new JarRunHandler(uploadDir, timeout, config)))
    +			// run a jar
    +			POST(router, new JarRunHandler(uploadDir, timeout, config));
     
    -				// upload a jar
    -				.POST("/jars/upload", handler(new JarUploadHandler(uploadDir)))
    +			// upload a jar
    +			POST(router, new JarUploadHandler(uploadDir));
     
    -				// delete an uploaded jar from submission interface
    -				.DELETE("/jars/:jarid", handler(new JarDeleteHandler(uploadDir)));
    +			// delete an uploaded jar from submission interface
    +			DELETE(router, new JarDeleteHandler(uploadDir));
     		} else {
    -			router
    -				// send an Access Denied message (sort of)
    -				// Every other GET request will go to the File Server, which will not provide
    -				// access to the jar directory anyway, because it doesn't exist in webRootDir.
    -				.GET("/jars", handler(new JarAccessDeniedHandler()));
    +			// send an Access Denied message
    +			JarAccessDeniedHandler jad = new JarAccessDeniedHandler();
    +			GET(router, jad);
    +			POST(router, jad);
    --- End diff --
    
    Why do we add the POST and DELETE here?


> Make handlers aware of their REST URLs
> --------------------------------------
>
>                 Key: FLINK-5870
>                 URL: https://issues.apache.org/jira/browse/FLINK-5870
>             Project: Flink
>          Issue Type: Improvement
>          Components: Webfrontend
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>             Fix For: 1.3.0
>
>
> The handlers in the WebRuntimeMonitor are currently unaware of the actual REST URL used.
The handlers are simply registered under a given URL, without any guarantee that the handler
can actually deal with that URL.
> I propose to let handlers themselves specify under which URL's they are supposed to be
reachable. This provides are tighter coupling between URL and handler.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message