flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
Date Thu, 18 Aug 2016 16:04:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426701#comment-15426701
] 

ASF GitHub Bot commented on FLINK-4389:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2363#discussion_r75337340
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
---
    @@ -0,0 +1,185 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.metrics;
    +
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.messages.Messages;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
    +import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.duration.Duration;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * The MetricFetcher can be used to fetch metrics from the JobManager and all registered
TaskManagers.
    + *
    + * Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided
that a sufficient time since
    + * the last call has passed.
    + */
    +public class MetricFetcher {
    +	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
    +
    +	private final JobManagerRetriever retriever;
    +	private final ExecutionContext ctx;
    +	private final FiniteDuration timeout = new FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(),
TimeUnit.MILLISECONDS);
    +
    +	private MetricStore metrics = new MetricStore();
    +
    +	private long lastUpdateTime;
    +
    +	public MetricFetcher(JobManagerRetriever retriever, ExecutionContext ctx) {
    +		this.retriever = Preconditions.checkNotNull(retriever);
    +		this.ctx = Preconditions.checkNotNull(ctx);
    +	}
    +
    +	/**
    +	 * Returns the MetricStore containing all stored metrics.
    +	 *
    +	 * @return MetricStore containing all stored metrics;
    +	 */
    +	public MetricStore getMetricStore() {
    +		return metrics;
    +	}
    +
    +	/**
    +	 * This method can be used to signal this MetricFetcher that the metrics are still in
use and should be updated.
    +	 */
    +	public void update() {
    +		synchronized (this) {
    +			long currentTime = System.currentTimeMillis();
    +			if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the
last update
    +				lastUpdateTime = currentTime;
    +				fetchMetrics();
    +			}
    +		}
    +	}
    +
    +	private void fetchMetrics() {
    +		try {
    +			Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort
= retriever.getJobManagerGatewayAndWebPort();
    +			if (jobManagerGatewayAndWebPort.isDefined()) {
    +				ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1();
    +
    +				/**
    +				 * Remove all metrics that belong to a job that is not running and no longer archived.
    +				 */
    +				jobManager.ask(new RequestJobDetails(true, true), timeout)
    +					.onSuccess(new OnSuccess<Object>() {
    +						@Override
    +						public void onSuccess(Object result) throws Throwable {
    +							MultipleJobsDetails details = (MultipleJobsDetails) result;
    +							ArrayList<String> toRetain = new ArrayList<>();
    +							for (JobDetails job : details.getRunningJobs()) {
    +								toRetain.add(job.getJobId().toString());
    +							}
    +							for (JobDetails job : details.getFinishedJobs()) {
    +								toRetain.add(job.getJobId().toString());
    +							}
    +							synchronized (metrics) {
    +								metrics.jobs.keySet().retainAll(toRetain);
    +							}
    +						}
    +					}, ctx);
    +
    +				/**
    +				 * Requests the metric dump from the job manager.
    +				 */
    +				jobManager.ask(Messages.getRequestMetrics(), timeout)
    +					.onSuccess(new OnSuccess<Object>() {
    +						@Override
    +						public void onSuccess(Object result) throws Throwable {
    +							addMetrics(result);
    +						}
    +					}, ctx);
    +
    +				/**
    +				 * We first request the list of all registered task managers from the job manager,
and then
    +				 * request the respective metric dump from each task manager.
    +				 *
    +				 * All stored metrics that do not belong to a registered task manager will be removed.
    +				 */
    +				jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout)
    +					.onSuccess(new OnSuccess<Object>() {
    +						@Override
    +						public void onSuccess(Object result) throws Throwable {
    +							Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers)
result).asJavaIterable();
    +							List<String> activeTaskManagers = new ArrayList<>();
    +							for (Instance taskManager : taskManagers) {
    +								activeTaskManagers.add(taskManager.getId().toString());
    +								taskManager.getActorGateway().ask(Messages.getRequestMetrics(), timeout)
    +									.onSuccess(new OnSuccess<Object>() {
    +										@Override
    +										public void onSuccess(Object result) throws Throwable {
    +											addMetrics(result);
    +										}
    +									}, ctx);
    +							}
    +							synchronized (metrics) { // remove all metrics belonging to unregistered task
managers
    +								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
    +							}
    +						}
    +					}, ctx);
    +			}
    +		} catch (Exception e) {
    +			LOG.warn("Exception while fetching metrics.", e);
    +		}
    +	}
    +
    +	private void addMetrics(Object result) {
    +		Object[] data = (Object[]) result;
    +		for (int x = 0; x < data.length; ) {
    +			synchronized (metrics) {
    +				switch ((byte) data[x++]) {
    +					case 0:
    +					case 1:
    --- End diff --
    
    Could we introduce some enums or so for these values. Maybe we could use the byte value
as the ordinal of the corresponding enum value.


> Expose metrics to Webfrontend
> -----------------------------
>
>                 Key: FLINK-4389
>                 URL: https://issues.apache.org/jira/browse/FLINK-4389
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Metrics, Webfrontend
>    Affects Versions: 1.1.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>             Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message