flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5852) Move JSON generation code into static methods
Date Tue, 28 Feb 2017 13:26:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887999#comment-15887999
] 

ASF GitHub Bot commented on FLINK-5852:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103431658
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java
---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    +		public static final String TASKMANAGERS = "taskmanagers";
    +		public static final String JOB_ID = "jid";
    +		public static final String ID = "id";
    +		public static final String NAME = "name";
    +		public static final String STATE = "state";
    +		public static final String IS_STOPPABLE = "isStoppable";
    +		public static final String PARALLELISM = "parallelism";
    +		public static final String PLAN = "plan";
    +
    +		public static final String START_TIME = "start-time";
    +		public static final String END_TIME = "end-time";
    +		public static final String DURATION = "duration";
    +		public static final String NOW = "now";
    +		public static final String LAST_MODIFICATION = "last-modification";
    +
    +		public static final String TIMESTAMP = "timestamp";
    +		public static final String TIMESTAMPS = "timestamps";
    +		public static final String STATUS_COUNTS = "status-counts";
    +
    +		public static final String REFRESH_INTERVAL = "refresh-interval";
    +		public static final String TIMEZONE_OFFSET = "timezone-offset";
    +		public static final String TIMEZONE_NAME = "timezone-name";
    +		public static final String FLINK_VERSION = "flink-version";
    +		public static final String FLINK_REVISION = "flink-revision";
    +
    +		public static final String EXECUTION_CONFIG = "execution-config";
    +		public static final String MODE = "mode";
    +		public static final String EXECUTION_MODE = "execution-mode";
    +		public static final String RESTART_STRATEGY = "restart-strategy";
    +		public static final String JOB_PARALLELISM = "job-parallelism";
    +		public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
    +		public static final String USER_CONFIG = "user-config";
    +
    +		public static final String ROOT_EXCEPTION = "root-exception";
    +		public static final String ALL_EXCEPTIONS = "all-exceptions";
    +		public static final String EXCEPTION = "exception";
    +		public static final String TRUNCATED = "truncated";
    +
    +		public static final String HOST = "host";
    +		public static final String LOCATION = "location";
    +
    +		public static final String VERTICES = "vertices";
    +		public static final String TASKS = "tasks";
    +		public static final String TASK = "task";
    +		public static final String SUBTASKS = "subtasks";
    +		public static final String SUBTASK = "subtask";
    +		public static final String ATTEMPT = "attempt";
    +
    +		public static final String STATUS = "status";
    +		public static final String TOTAL = "total";
    +		public static final String PENDING = "pending";
    +		public static final String RUNNING = "running";
    +		public static final String FINISHED = "finished";
    +		public static final String CANCELING = "canceling";
    +		public static final String CANCELED = "canceled";
    +		public static final String FAILED = "failed";
    +		public static final String RESTORED = "restored";
    +		public static final String PENDING_OR_FAILED = "pending_or_failed";
    +		public static final String DISCARDED = "discarded";
    +		public static final String IN_PROGRESS = "in_progress";
    +		public static final String COMPLETED = "completed";
    +
    +		public static final String METRICS = "metrics";
    +		public static final String WRITE_BYTES = "write-bytes";
    +		public static final String READ_BYTES = "read-bytes";
    +		public static final String WRITE_RECORDS = "write-records";
    +		public static final String READ_RECORDS = "read-records";
    +		public static final String TYPE = "type";
    +		public static final String VALUE = "value";
    +
    +		public static final String MIN = "min";
    +		public static final String MAX = "max";
    +		public static final String AVG = "avg";
    +
    +		public static final String JOB_ACCUMULATORS = "job-accumulators";
    +		public static final String USER_ACCUMULATORS = "user-accumulators";
    +		public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
    +		
    +		public static final String COUNTS = "counts";
    +		public static final String EXTERNALIZATION = "externalization";
    +		public static final String EXTERNAL_PATH = "external-path";
    +		public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
    +		public static final String HISTORY = "history";
    +
    +		public static final String SUMMARY = "summary";
    +		public static final String STATE_SIZE = "state_size";
    +		public static final String ETE_DURATION = "end_to_end_duration";
    +		public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
    +		public static final String SAVEPOINT = "savepoint";
    +		public static final String IS_SAVEPOINT = "is_savepoint";
    +		public static final String CHECKPOINT = "checkpoint";
    +		public static final String CHECKPOINT_DURATION = "checkpoint_duration";
    +		public static final String SYNC = "sync";
    +		public static final String ASYNC = "async";
    +		public static final String ALIGNMENT = "alignment";
    +		public static final String BUFFERED = "buffered";
    +		
    +		public static final String LATEST = "latest";
    +		
    +		public static final String FAILURE_TIMESTAMP = "failure_timestamp";
    +		public static final String FAILURE_MESSAGE = "failure_message";
    +		public static final String RESTORE_TIMESTAMP = "restore_timestamp";
    +		
    +		public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
    +		public static final String ACK_TIMESTAMP = "ack_timestamp";
    +		public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +		
    +		public static final String NUM_SUBTASKS = "num_subtasks";
    +		public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +		public static final String INDEX = "index";
    +		public static final String INTERVAL = "interval";
    +		public static final String ENABLED = "enabled";
    +		public static final String TIMEOUT = "timeout";
    +		public static final String MIN_PAUSE = "min_pause";
    +		public static final String MAX_CONCURRENT = "max_concurrent";
    +
    +		private Keys() {
    +		}
    +	}
    +
    +	public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator
gen) throws IOException {
    +		CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph),
gen, System.currentTimeMillis());
    +	}
    +
    +	public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws
IOException {
    +		gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum());
    +		gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum());
    +		gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage());
    +	}
    +
    +	public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state,
@Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID,
int subtaskIndex) {
    +		if (state.isTerminal()) {
    +			if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored
in ExecutionGraph
    +				summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal());
    +				summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote());
    +				summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut());
    +				summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn());
    +				summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut());
    +			}
    +		} else { // execAttempt is still running, use MetricQueryService instead
    +			if (fetcher != null) {
    +				fetcher.update();
    +				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID,
taskID, subtaskIndex);
    +				if (metrics != null) {
    +					summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL,
"0")));
    +					summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE,
"0")));
    +					summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT,
"0")));
    +					summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN,
"0")));
    +					summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT,
"0")));
    +				}
    +			}
    +		}
    +	}
    +
    +	public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException
{
    --- End diff --
    
    Move this to `MutableIOMetrics`


> Move JSON generation code into static methods
> ---------------------------------------------
>
>                 Key: FLINK-5852
>                 URL: https://issues.apache.org/jira/browse/FLINK-5852
>             Project: Flink
>          Issue Type: Improvement
>          Components: Webfrontend
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>             Fix For: 1.3.0
>
>
> In order to implement the HistoryServer we need a way to generate the JSON responses
independent of the REST API. As such i suggest to move the main parts of the generation code
for job-specific handlers into static methods. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message