flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
Date Tue, 22 Aug 2017 14:59:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136899#comment-16136899
] 

ASF GitHub Bot commented on FLINK-7040:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4569#discussion_r134507274
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
---
    @@ -155,38 +129,66 @@ public void shutdown() {
     			.set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort)
     			.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
     
    -		synchronized (this) {
    -			// This ensures strict sequential processing of requests.
    -			// If we send new requests immediately we can no longer make assumptions about the
order in which responses
    -			// arrive, due to which the handler cannot know which future he should complete (not
to mention what response
    -			// type to read).
    -			CompletableFuture<P> nextFuture = lastFuture
    -				.handleAsync((f, e) -> submitRequest(httpRequest, messageHeaders), directExecutor)
    -				.thenCompose((future) -> future);
    -
    -			lastFuture = nextFuture;
    -			return nextFuture;
    -		}
    +		return submitRequest(httpRequest, messageHeaders);
     	}
     
     	private <M extends MessageHeaders<R, P, U>, U extends ParameterMapper, R extends
RequestBody, P extends ResponseBody> CompletableFuture<P> submitRequest(FullHttpRequest
httpRequest, M messageHeaders) {
    -		CompletableFuture<P> responseFuture = handler.expectResponse(messageHeaders.getResponseClass());
    -
    -		try {
    -			// write request
    -			Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel();
    -			channel.writeAndFlush(httpRequest);
    -			channel.closeFuture();
    -		} catch (InterruptedException e) {
    -			return FutureUtils.completedExceptionally(e);
    +		synchronized (lock) {
    +			CompletableFuture<P> responseFuture = ClientHandler.addHandlerForResponse(bootstrap,
sslEngine, messageHeaders.getResponseClass());
    +
    +			try {
    +				// write request
    +				Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel();
    +				channel.writeAndFlush(httpRequest);
    +				channel.closeFuture();
    +			} catch (InterruptedException e) {
    +				return FutureUtils.completedExceptionally(e);
    +			}
    +			return responseFuture;
     		}
    -		return responseFuture;
     	}
     
    -	@ChannelHandler.Sharable
    -	private static class ClientHandler extends SimpleChannelInboundHandler<Object>
{
    +	private static class RestChannelInitializer extends ChannelInitializer<SocketChannel>
{
     
    -		private volatile ExpectedResponse<? extends ResponseBody> expectedResponse =
null;
    +		private final SSLEngine sslEngine;
    +		private final ClientHandler handler;
    +
    +		public RestChannelInitializer(SSLEngine sslEngine, ClientHandler handler) {
    +			this.sslEngine = sslEngine;
    +			this.handler = handler;
    +		}
    +
    +		@Override
    +		protected void initChannel(SocketChannel ch) throws Exception {
    +			// SSL should be the first handler in the pipeline
    +			if (sslEngine != null) {
    +				ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
    +			}
    +
    +			ch.pipeline()
    +				.addLast(new HttpClientCodec())
    +				.addLast(new HttpObjectAggregator(1024 * 1024))
    +				.addLast(handler)
    +				.addLast(new PipelineErrorHandler(LOG));
    +		}
    +	}
    +
    +	private static class ClientHandler<P extends ResponseBody> extends SimpleChannelInboundHandler<Object>
{
    +
    +		private final ExpectedResponse<P> expectedResponse;
    +
    +		private ClientHandler(ExpectedResponse<P> expectedResponse) {
    +			this.expectedResponse = expectedResponse;
    +		}
    +
    +		static <P extends ResponseBody> CompletableFuture<P> addHandlerForResponse(Bootstrap
bootStrap, SSLEngine sslEngine, Class<P> expectedResponse) {
    +			CompletableFuture<P> responseFuture = new CompletableFuture<>();
    +
    +			ClientHandler handler = new ClientHandler<>(new ExpectedResponse<>(expectedResponse,
responseFuture));
    +			bootStrap.handler(new RestChannelInitializer(sslEngine, handler));
    +
    +			return responseFuture;
    +		}
     
     		@Override
     		protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
{
    --- End diff --
    
    Then we should find it out.


> Flip-6 client-cluster communication
> -----------------------------------
>
>                 Key: FLINK-7040
>                 URL: https://issues.apache.org/jira/browse/FLINK-7040
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management, Mesos
>            Reporter: Till Rohrmann
>            Assignee: Chesnay Schepler
>            Priority: Critical
>              Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the cluster in a RESTful
manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the savepoint under
which the savepoint was stored? Maybe always having to specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new notifications from
the execution of the given job/Opens WebSocket to receive notifications
> The first four REST calls will be served by the REST endpoint running in the application
master/cluster entrypoint. The other calls will be served by a REST endpoint running along
side to the JobManager.
> Detailed information about different implementations and their pros and cons can be found
in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message