flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
Date Mon, 09 Oct 2017 09:13:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16196713#comment-16196713
] 

ASF GitHub Bot commented on FLINK-7709:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143415293
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property
= "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class,
name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name
= "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS)
@Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    +		this.status = Preconditions.checkNotNull(status);
    +		this.savepoint = savepoint;
    +		this.triggerTimestamp = triggerTimestamp;
    +		this.latestAckTimestamp = latestAckTimestamp;
    +		this.stateSize = stateSize;
    +		this.duration = duration;
    +		this.alignmentBuffered = alignmentBuffered;
    +		this.numSubtasks = numSubtasks;
    +		this.numAckSubtasks = numAckSubtasks;
    +		this.checkpointStatisticsPerTask = checkpointStatisticsPerTask;
    +	}
    +
    +	public long getId() {
    +		return id;
    +	}
    +
    +	public CheckpointStatsStatus getStatus() {
    +		return status;
    +	}
    +
    +	public boolean isSavepoint() {
    +		return savepoint;
    +	}
    +
    +	public long getTriggerTimestamp() {
    +		return triggerTimestamp;
    +	}
    +
    +	public long getLatestAckTimestamp() {
    +		return latestAckTimestamp;
    +	}
    +
    +	public long getStateSize() {
    +		return stateSize;
    +	}
    +
    +	public long getDuration() {
    +		return duration;
    +	}
    +
    +	public long getAlignmentBuffered() {
    +		return alignmentBuffered;
    +	}
    +
    +	public int getNumSubtasks() {
    +		return numSubtasks;
    +	}
    +
    +	public int getNumAckSubtasks() {
    +		return numAckSubtasks;
    +	}
    +
    +	@Nullable
    +	public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask()
{
    +		return checkpointStatisticsPerTask;
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +		CheckpointStatistics that = (CheckpointStatistics) o;
    +		return id == that.id &&
    +			savepoint == that.savepoint &&
    +			triggerTimestamp == that.triggerTimestamp &&
    +			latestAckTimestamp == that.latestAckTimestamp &&
    +			stateSize == that.stateSize &&
    +			duration == that.duration &&
    +			alignmentBuffered == that.alignmentBuffered &&
    +			numSubtasks == that.numSubtasks &&
    +			numAckSubtasks == that.numAckSubtasks &&
    +			status == that.status &&
    +			Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize,
duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask);
    +	}
    +
    +	public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats
checkpointStats, boolean includeTaskCheckpointStatistics) {
    +		if (checkpointStats != null) {
    +
    +			Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +			if (includeTaskCheckpointStatistics) {
    +				Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats();
    +
    +				checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size());
    +
    +				for (TaskStateStats taskStateStat : taskStateStats) {
    +					checkpointStatisticsPerTask.put(
    +						taskStateStat.getJobVertexId(),
    +						new TaskCheckpointStatistics(
    +							taskStateStat.getLatestAckTimestamp(),
    +							taskStateStat.getStateSize(),
    +							taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
    +							taskStateStat.getAlignmentBuffered(),
    +							taskStateStat.getNumberOfSubtasks(),
    +							taskStateStat.getNumberOfAcknowledgedSubtasks()));
    +				}
    +			} else {
    +				checkpointStatisticsPerTask = null;
    +			}
    +
    +			if (checkpointStats instanceof CompletedCheckpointStats) {
    +				final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats)
checkpointStats);
    +
    +				return new CheckpointStatistics.CompletedCheckpointStatistics(
    +					completedCheckpointStats.getCheckpointId(),
    +					completedCheckpointStats.getStatus(),
    +					completedCheckpointStats.getProperties().isSavepoint(),
    +					completedCheckpointStats.getTriggerTimestamp(),
    +					completedCheckpointStats.getLatestAckTimestamp(),
    +					completedCheckpointStats.getStateSize(),
    +					completedCheckpointStats.getEndToEndDuration(),
    +					completedCheckpointStats.getAlignmentBuffered(),
    +					completedCheckpointStats.getNumberOfSubtasks(),
    +					completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					completedCheckpointStats.getExternalPath(),
    +					completedCheckpointStats.isDiscarded());
    +			} else if (checkpointStats instanceof FailedCheckpointStats) {
    +				final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.FailedCheckpointStatistics(
    +					failedCheckpointStats.getCheckpointId(),
    +					failedCheckpointStats.getStatus(),
    +					failedCheckpointStats.getProperties().isSavepoint(),
    +					failedCheckpointStats.getTriggerTimestamp(),
    +					failedCheckpointStats.getLatestAckTimestamp(),
    +					failedCheckpointStats.getStateSize(),
    +					failedCheckpointStats.getEndToEndDuration(),
    +					failedCheckpointStats.getAlignmentBuffered(),
    +					failedCheckpointStats.getNumberOfSubtasks(),
    +					failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					failedCheckpointStats.getFailureTimestamp(),
    +					failedCheckpointStats.getFailureMessage());
    +			} else {
    +				throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName()
+ " cannot be converted.");
    +			}
    +		} else {
    +			return null;
    --- End diff --
    
    pretty sure this would lead to a NullPointerException, since neither the CheckpointStatshandler,
not AbstractExceutionGraphhandler, nor the AbstractRestHandler handle the case of the returned
value being null.


> Port CheckpointStatsDetailsHandler to new REST endpoint
> -------------------------------------------------------
>
>                 Key: FLINK-7709
>                 URL: https://issues.apache.org/jira/browse/FLINK-7709
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, REST, Webfrontend
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Till Rohrmann
>              Labels: flip-6
>             Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message