flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/3] flink git commit: [FLINK-7072] [REST] Define protocol for job submit/cancel/stop
Date Mon, 09 Oct 2017 17:11:53 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
new file mode 100644
index 0000000..257b241
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * <p>We currently require the job-jars to be uploaded through the blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+	private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph";
+
+	/**
+	 * The serialized job graph.
+	 */
+	@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+	public final byte[] serializedJobGraph;
+
+	public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
+		this(serializeJobGraph(jobGraph));
+	}
+
+	@JsonCreator
+	public JobSubmitRequestBody(
+		@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) {
+
+		// check that job graph can be read completely by the HttpObjectAggregator on the server
+		// we subtract 1024 bytes to account for http headers and such.
+		if (serializedJobGraph.length > RestServerEndpoint.MAX_REQUEST_SIZE_BYTES - 1024) {
+			throw new IllegalArgumentException("Serialized job graph exceeded max request size.");
+		}
+		this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph);
+	}
+
+	@Override
+	public int hashCode() {
+		return 71 * Arrays.hashCode(this.serializedJobGraph);
+	}
+
+	@Override
+	public boolean equals(Object object) {
+		if (object instanceof JobSubmitRequestBody) {
+			JobSubmitRequestBody other = (JobSubmitRequestBody) object;
+			return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph);
+		}
+		return false;
+	}
+
+	private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException {
+		try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) {
+			ObjectOutputStream out = new ObjectOutputStream(baos);
+
+			out.writeObject(jobGraph);
+
+			return baos.toByteArray();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java
new file mode 100644
index 0000000..fefd435
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Response to the submission of a job, containing a URL from which the status of the job
can be retrieved from.
+ */
+public final class JobSubmitResponseBody implements ResponseBody {
+
+	public static final String FIELD_NAME_JOB_URL = "jobUrl";
+
+	/**
+	 * The URL under which the job status can monitored.
+	 */
+	@JsonProperty(FIELD_NAME_JOB_URL)
+	public final String jobUrl;
+
+	@JsonCreator
+	public JobSubmitResponseBody(
+		@JsonProperty(FIELD_NAME_JOB_URL) String jobUrl) {
+
+		this.jobUrl = jobUrl;
+	}
+
+	@Override
+	public int hashCode() {
+		return 73 * jobUrl.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object object) {
+		if (object instanceof JobSubmitResponseBody) {
+			JobSubmitResponseBody other = (JobSubmitResponseBody) object;
+			return Objects.equals(this.jobUrl, other.jobUrl);
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
new file mode 100644
index 0000000..0ea18db
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link BlobServerPortHandler}.
+ */
+public class BlobServerPortHandlerTest extends TestLogger {
+	private static final int PORT = 64;
+
+	@Test
+	public void testPortRetrieval() throws Exception {
+		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+		when(mockGateway.getBlobServerPort(any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(PORT));
+		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+
+		BlobServerPortHandler handler = new BlobServerPortHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		BlobServerPortResponseBody portResponse = handler
+			.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()),
mockGateway)
+			.get();
+
+		Assert.assertEquals(PORT, portResponse.port);
+	}
+
+	@Test
+	public void testPortRetrievalFailureHandling() throws Exception {
+		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+		when(mockGateway.getBlobServerPort(any(Time.class)))
+			.thenReturn(FutureUtils.completedExceptionally(new TestException()));
+		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+
+		BlobServerPortHandler handler = new BlobServerPortHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		try {
+			handler
+				.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()),
mockGateway)
+				.get();
+			Assert.fail();
+		} catch (ExecutionException ee) {
+			RestHandlerException rhe = (RestHandlerException) ee.getCause();
+
+			Assert.assertEquals(TestException.class, rhe.getCause().getClass());
+			Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, rhe.getHttpResponseStatus());
+		}
+	}
+
+	private static class TestException extends Exception {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
new file mode 100644
index 0000000..1196d40
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link JobSubmitHandler}.
+ */
+public class JobSubmitHandlerTest extends TestLogger {
+
+	@Test
+	public void testSerializationFailureHandling() throws Exception {
+		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+
+		JobSubmitHandler handler = new JobSubmitHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]);
+
+		try {
+			handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()),
mockGateway);
+			Assert.fail();
+		} catch (RestHandlerException rhe) {
+			Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, rhe.getHttpResponseStatus());
+		}
+	}
+
+	@Test
+	public void testSuccessfulJobSubmission() throws Exception {
+		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+
+		JobSubmitHandler handler = new JobSubmitHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		JobGraph job = new JobGraph("testjob");
+		JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+
+		handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()),
mockGateway)
+			.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
new file mode 100644
index 0000000..add4e3b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase;
+
+/**
+ * Tests for {@link BlobServerPortResponseBody}.
+ */
+public class BlobServerPortResponseTest extends RestResponseMarshallingTestBase<BlobServerPortResponseBody>
{
+
+	@Override
+	protected Class<BlobServerPortResponseBody> getTestResponseClass() {
+		return BlobServerPortResponseBody.class;
+	}
+
+	@Override
+	protected BlobServerPortResponseBody getTestResponseInstance() throws Exception {
+		return new BlobServerPortResponseBody(64);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
new file mode 100644
index 0000000..e69913c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.legacy.messages.RestRequestMarshallingTestBase;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+
+import java.io.IOException;
+
+/**
+ * Tests for the {@link JobSubmitRequestBody}.
+ */
+public class JobSubmitRequestBodyTest extends RestRequestMarshallingTestBase<JobSubmitRequestBody>
{
+
+	@Override
+	protected Class<JobSubmitRequestBody> getTestRequestClass() {
+		return JobSubmitRequestBody.class;
+	}
+
+	@Override
+	protected JobSubmitRequestBody getTestRequestInstance() throws IOException {
+		return new JobSubmitRequestBody(new JobGraph("job"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
new file mode 100644
index 0000000..9dc832a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+/**
+ * Tests for {@link JobSubmitResponseBody}.
+ */
+public class JobSubmitResponseBodyTest extends RestResponseMarshallingTestBase<JobSubmitResponseBody>
{
+
+	@Override
+	protected Class<JobSubmitResponseBody> getTestResponseClass() {
+		return JobSubmitResponseBody.class;
+	}
+
+	@Override
+	protected JobSubmitResponseBody getTestResponseInstance() throws Exception {
+		return new JobSubmitResponseBody("/url");
+	}
+}


Mime
View raw message