Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D9626166D55 for ; Tue, 22 Aug 2017 14:43:10 +0200 (CEST) Received: (qmail 78738 invoked by uid 500); 22 Aug 2017 12:43:10 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 78694 invoked by uid 99); 22 Aug 2017 12:43:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Aug 2017 12:43:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 9C46DC2F92 for ; Tue, 22 Aug 2017 12:43:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 2rP151s1E3J7 for ; Tue, 22 Aug 2017 12:43:08 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id E1B6B5FD6F for ; Tue, 22 Aug 2017 12:43:07 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id AF269E0EA9 for ; Tue, 22 Aug 2017 12:43:06 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 54F59253AF for ; Tue, 22 Aug 2017 12:43:04 +0000 (UTC) Date: Tue, 22 Aug 2017 12:43:04 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136721#comment-16136721 ] ASF GitHub Bot commented on FLINK-7040: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134459773 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ParameterMapper; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * This client is the counter-part to the {@link RestServerEndpoint}. + */ +public class RestClientEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class); + + private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + private final String configuredTargetAddress; + private final int configuredTargetPort; + private final SSLEngine sslEngine; + + private Bootstrap bootstrap; + + private final ClientHandler handler = new ClientHandler(); + + private CompletableFuture lastFuture = CompletableFuture.completedFuture(null); + + private final Executor directExecutor = Executors.directExecutor(); + + public RestClientEndpoint(RestClientEndpointConfiguration configuration) { + this.configuredTargetAddress = configuration.getTargetRestEndpointAddress(); + this.configuredTargetPort = configuration.getTargetRestEndpointPort(); + this.sslEngine = configuration.getSslEngine(); + } + + public void start() { + ChannelInitializer initializer = new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { + + // SSL should be the first handler in the pipeline + if (sslEngine != null) { + ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); + } + + ch.pipeline() + .addLast(new HttpClientCodec()) + .addLast(new HttpObjectAggregator(1024 * 1024)) + .addLast(handler) + .addLast(new PipelineErrorHandler(LOG)); + } + }; + + NioEventLoopGroup group = new NioEventLoopGroup(1); + + bootstrap = new Bootstrap(); + bootstrap + .group(group) + .channel(NioSocketChannel.class) + .handler(initializer); + + LOG.info("Rest client endpoint started."); + } + + public void shutdown() { + if (bootstrap != null) { + if (bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(); + } + } + } + + public , U extends ParameterMapper, R extends RequestBody, P extends ResponseBody> CompletableFuture

sendRequest(M messageHeaders, U urlResolver, R request) throws IOException { + Preconditions.checkNotNull(messageHeaders); + Preconditions.checkNotNull(request); + + String targetUrl = ParameterMapper.resolveUrl( + messageHeaders.getTargetRestEndpointURL(), + urlResolver.mapPathParameters(messageHeaders.getPathParameters()), + urlResolver.mapQueryParameters(messageHeaders.getQueryParameters()) + ); + + LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl); + // serialize payload + StringWriter sw = new StringWriter(); + objectMapper.writeValue(sw, request); + ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); + + // create request and set headers + FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + httpRequest.headers() + .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity()) + .add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name()) + .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort) + .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + + synchronized (this) { + // This ensures strict sequential processing of requests. + // If we send new requests immediately we can no longer make assumptions about the order in which responses + // arrive, due to which the handler cannot know which future he should complete (not to mention what response + // type to read). + CompletableFuture

nextFuture = lastFuture + .handleAsync((f, e) -> submitRequest(httpRequest, messageHeaders), directExecutor) + .thenCompose((future) -> future); + + lastFuture = nextFuture; + return nextFuture; + } + } + + private , U extends ParameterMapper, R extends RequestBody, P extends ResponseBody> CompletableFuture

submitRequest(FullHttpRequest httpRequest, M messageHeaders) { + CompletableFuture

responseFuture = handler.expectResponse(messageHeaders.getResponseClass()); + + try { + // write request + Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); + channel.writeAndFlush(httpRequest); + channel.closeFuture(); + } catch (InterruptedException e) { + return FutureUtils.completedExceptionally(e); + } + return responseFuture; + } + + @ChannelHandler.Sharable + private static class ClientHandler extends SimpleChannelInboundHandler { + + private volatile ExpectedResponse expectedResponse = null; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpResponse) { + completeFuture((FullHttpResponse) msg); + } else { + LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse."); + } + } + + private void completeFuture(FullHttpResponse msg) throws IOException { + ByteBuf content = msg.content(); + + JsonNode rawResponse; + try { + InputStream in = new ByteBufInputStream(content); + rawResponse = objectMapper.readTree(in); + LOG.debug("Received response {}.", rawResponse); + } catch (JsonMappingException | JsonParseException je) { + LOG.error("Response was not valid JSON.", je); + return; + } + + final ExpectedResponse expectedResponse = (ExpectedResponse) this.expectedResponse; + this.expectedResponse = null; + + if (expectedResponse == null) { + LOG.error("Received response, but we didn't expect any. Response={}", rawResponse); + return; + } + + try { --- End diff -- Not sure why we need the outer try-catch block? > Flip-6 client-cluster communication > ----------------------------------- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos > Reporter: Till Rohrmann > Assignee: Chesnay Schepler > Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the savepoint under which the savepoint was stored? Maybe always having to specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new notifications from the execution of the given job/Opens WebSocket to receive notifications > The first four REST calls will be served by the REST endpoint running in the application master/cluster entrypoint. The other calls will be served by a REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)