flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7704] [flip6] Add JobPlanHandler for new RestServerEndpoint
Date Tue, 10 Oct 2017 21:03:18 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0a286d0ff -> 9829ca00d


[FLINK-7704] [flip6] Add JobPlanHandler for new RestServerEndpoint

This closes #4768.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9829ca00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9829ca00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9829ca00

Branch: refs/heads/master
Commit: 9829ca00dff201879724847b498fe0432219cb53
Parents: 0a286d0
Author: yew1eb <yew1eb@gmail.com>
Authored: Wed Oct 4 01:07:49 2017 +0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Oct 10 18:44:06 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  11 ++
 .../rest/handler/job/JobPlanHandler.java        |  61 ++++++++++
 .../runtime/rest/messages/JobPlanHeaders.java   |  71 ++++++++++++
 .../runtime/rest/messages/JobPlanInfo.java      | 113 +++++++++++++++++++
 .../legacy/messages/JobPlanInfoTest.java        |  41 +++++++
 5 files changed, 297 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 2a2d9be..6297b41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
@@ -51,6 +52,7 @@ import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
+import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
@@ -186,6 +188,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executor,
 			checkpointStatsCache);
 
+		JobPlanHandler jobPlanHandler = new JobPlanHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			JobPlanHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -210,6 +220,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
 		handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
 		handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler));
+		handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler));
 
 		BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture,
leaderRetriever, timeout);
 		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
new file mode 100644
index 0000000..c8e6f8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -0,0 +1,61 @@
+package org.apache.flink.runtime.rest.handler.job;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler serving the job execution plan.
+ */
+public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
{
+
+	public JobPlanHandler(
+		CompletableFuture<String> localRestAddress,
+		GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+		Time timeout,
+		MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> messageHeaders,
+		ExecutionGraphCache executionGraphCache,
+		Executor executor) {
+
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			messageHeaders,
+			executionGraphCache,
+			executor);
+	}
+
+	@Override
+	protected JobPlanInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters>
request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+		return new JobPlanInfo(executionGraph.getJsonPlan());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
new file mode 100644
index 0000000..17204bb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobPlanHandler}.
+ */
+public class JobPlanHeaders implements MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters>
{
+
+	private static final JobPlanHeaders INSTANCE = new JobPlanHeaders();
+
+	public static final String URL = "/jobs/:jobid/plan";
+
+	private JobPlanHeaders() {
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<JobPlanInfo> getResponseClass() {
+		return JobPlanInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static JobPlanHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
new file mode 100644
index 0000000..3987723
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobPlanHandler}.
+ */
+@JsonSerialize(using = JobPlanInfo.Serializer.class)
+@JsonDeserialize(using = JobPlanInfo.Deserializer.class)
+public class JobPlanInfo implements ResponseBody {
+
+	private final String jsonPlan;
+
+	public JobPlanInfo(String jsonPlan) {
+		this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
+	}
+
+	public String getJsonPlan() {
+		return jsonPlan;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JobPlanInfo that = (JobPlanInfo) o;
+		return Objects.equals(jsonPlan, that.jsonPlan);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(jsonPlan);
+	}
+
+	//---------------------------------------------------------------------------------
+	// Static helper classes
+	//---------------------------------------------------------------------------------
+
+	/**
+	 * Json serializer for the {@link JobPlanInfo}.
+	 */
+	public static final class Serializer extends StdSerializer<JobPlanInfo> {
+
+		private static final long serialVersionUID = -1551666039618928811L;
+
+		public Serializer() {
+			super(JobPlanInfo.class);
+		}
+
+		@Override
+		public void serialize(
+			JobPlanInfo jobPlanInfo,
+			JsonGenerator jsonGenerator,
+			SerializerProvider serializerProvider) throws IOException {
+			jsonGenerator.writeString(jobPlanInfo.getJsonPlan());
+		}
+	}
+
+	/**
+	 * Json deserializer for the {@link JobPlanInfo}.
+	 */
+	public static final class Deserializer extends StdDeserializer<JobPlanInfo> {
+
+		private static final long serialVersionUID = -3580088509877177213L;
+
+		public Deserializer() {
+			super(JobPlanInfo.class);
+		}
+
+		@Override
+		public JobPlanInfo deserialize(
+			JsonParser jsonParser,
+			DeserializationContext deserializationContext) throws IOException {
+			final String jsonPlan = jsonParser.getText();
+			return new JobPlanInfo(jsonPlan);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
new file mode 100644
index 0000000..1fe51d0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+
+/**
+ * Tests that the {@link JobPlanInfo} can be marshalled and unmarshalled.
+ */
+public class JobPlanInfoTest extends RestResponseMarshallingTestBase<JobPlanInfo> {
+
+	@Override
+	protected Class<JobPlanInfo> getTestResponseClass() {
+		return JobPlanInfo.class;
+	}
+
+	@Override
+	protected JobPlanInfo getTestResponseInstance() {
+		JobID jobID = new JobID();
+		String jobName = "job_007";
+		String jsonPlan = "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}";
+		return new JobPlanInfo(jsonPlan);
+	}
+}


Mime
View raw message