Repository: flink
Updated Branches:
refs/heads/master 0a286d0ff -> 9829ca00d
[FLINK-7704] [flip6] Add JobPlanHandler for new RestServerEndpoint
This closes #4768.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9829ca00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9829ca00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9829ca00
Branch: refs/heads/master
Commit: 9829ca00dff201879724847b498fe0432219cb53
Parents: 0a286d0
Author: yew1eb <yew1eb@gmail.com>
Authored: Wed Oct 4 01:07:49 2017 +0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Oct 10 18:44:06 2017 +0200
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 11 ++
.../rest/handler/job/JobPlanHandler.java | 61 ++++++++++
.../runtime/rest/messages/JobPlanHeaders.java | 71 ++++++++++++
.../runtime/rest/messages/JobPlanInfo.java | 113 +++++++++++++++++++
.../legacy/messages/JobPlanInfoTest.java | 41 +++++++
5 files changed, 297 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 2a2d9be..6297b41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
@@ -51,6 +52,7 @@ import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
+import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
@@ -186,6 +188,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executor,
checkpointStatsCache);
+ JobPlanHandler jobPlanHandler = new JobPlanHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ JobPlanHeaders.getInstance(),
+ executionGraphCache,
+ executor);
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -210,6 +220,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler));
+ handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler));
BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture,
leaderRetriever, timeout);
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
new file mode 100644
index 0000000..c8e6f8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -0,0 +1,61 @@
+package org.apache.flink.runtime.rest.handler.job;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler serving the job execution plan.
+ */
+public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
{
+
+ public JobPlanHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+
+ super(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ messageHeaders,
+ executionGraphCache,
+ executor);
+ }
+
+ @Override
+ protected JobPlanInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters>
request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+ return new JobPlanInfo(executionGraph.getJsonPlan());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
new file mode 100644
index 0000000..17204bb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobPlanHandler}.
+ */
+public class JobPlanHeaders implements MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters>
{
+
+ private static final JobPlanHeaders INSTANCE = new JobPlanHeaders();
+
+ public static final String URL = "/jobs/:jobid/plan";
+
+ private JobPlanHeaders() {
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<JobPlanInfo> getResponseClass() {
+ return JobPlanInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static JobPlanHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
new file mode 100644
index 0000000..3987723
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobPlanHandler}.
+ */
+@JsonSerialize(using = JobPlanInfo.Serializer.class)
+@JsonDeserialize(using = JobPlanInfo.Deserializer.class)
+public class JobPlanInfo implements ResponseBody {
+
+ private final String jsonPlan;
+
+ public JobPlanInfo(String jsonPlan) {
+ this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
+ }
+
+ public String getJsonPlan() {
+ return jsonPlan;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobPlanInfo that = (JobPlanInfo) o;
+ return Objects.equals(jsonPlan, that.jsonPlan);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jsonPlan);
+ }
+
+ //---------------------------------------------------------------------------------
+ // Static helper classes
+ //---------------------------------------------------------------------------------
+
+ /**
+ * Json serializer for the {@link JobPlanInfo}.
+ */
+ public static final class Serializer extends StdSerializer<JobPlanInfo> {
+
+ private static final long serialVersionUID = -1551666039618928811L;
+
+ public Serializer() {
+ super(JobPlanInfo.class);
+ }
+
+ @Override
+ public void serialize(
+ JobPlanInfo jobPlanInfo,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException {
+ jsonGenerator.writeString(jobPlanInfo.getJsonPlan());
+ }
+ }
+
+ /**
+ * Json deserializer for the {@link JobPlanInfo}.
+ */
+ public static final class Deserializer extends StdDeserializer<JobPlanInfo> {
+
+ private static final long serialVersionUID = -3580088509877177213L;
+
+ public Deserializer() {
+ super(JobPlanInfo.class);
+ }
+
+ @Override
+ public JobPlanInfo deserialize(
+ JsonParser jsonParser,
+ DeserializationContext deserializationContext) throws IOException {
+ final String jsonPlan = jsonParser.getText();
+ return new JobPlanInfo(jsonPlan);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
new file mode 100644
index 0000000..1fe51d0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+
+/**
+ * Tests that the {@link JobPlanInfo} can be marshalled and unmarshalled.
+ */
+public class JobPlanInfoTest extends RestResponseMarshallingTestBase<JobPlanInfo> {
+
+ @Override
+ protected Class<JobPlanInfo> getTestResponseClass() {
+ return JobPlanInfo.class;
+ }
+
+ @Override
+ protected JobPlanInfo getTestResponseInstance() {
+ JobID jobID = new JobID();
+ String jobName = "job_007";
+ String jsonPlan = "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}";
+ return new JobPlanInfo(jsonPlan);
+ }
+}
|