flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
Date Thu, 12 Oct 2017 19:03:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202468#comment-16202468
] 

ASF GitHub Bot commented on FLINK-7738:
---------------------------------------

Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r144381682
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractWebSocketHandler.java
---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
    +import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageParameters;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.WebSocketSpecification;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
    +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.concurrent.CompletableFuture;
    +
    +/**
    + * A channel handler for WebSocket resources.
    + *
    + * <p>This handler handles handshaking and ongoing messaging with a WebSocket client,
    + * based on a {@link WebSocketSpecification} that describes the REST resource location,
    + * parameter type, and message inbound/outbound types.  Messages are automatically
    + * encoded from (and decoded to) JSON text.
    + *
    + * <p>Subclasses should override the following methods to extend the respective
phases.
    + * <ol>
    + *     <li>{@code handshakeInitiated} - occurs upon receipt of a handshake request
from an HTTP client.  Useful for parameter validation.</li>
    + *     <li>{@code handshakeCompleted} - occurs upon successful completion; WebSocket
is ready for I/O.</li>
    + *     <li>{@code messageReceived}: occurs when a WebSocket message is received
on the channel.</li>
    + * </ol>
    + *
    + * <p>The handler supports gateway availability announcements.
    + *
    + * @param <T> The gateway type.
    + * @param <M> The REST parameter type.
    + * @param <O> The outbound message type.
    + * @param <I> The inbound message type.
    + */
    +public abstract class AbstractWebSocketHandler<T extends RestfulGateway, M extends
MessageParameters, O extends RequestBody, I extends ResponseBody> extends ChannelInboundHandlerAdapter
{
    +
    +	protected final Logger log = LoggerFactory.getLogger(getClass());
    +
    +	private final RedirectHandler redirectHandler;
    +
    +	private final AttributeKey<T> gatewayAttr;
    +
    +	private final WebSocketSpecification<M, O, I> specification;
    +
    +	private final ChannelHandler messageCodec;
    +
    +	private final AttributeKey<M> parametersAttr;
    +
    +	/**
    +	 * Creates a new handler.
    +	 */
    +	public AbstractWebSocketHandler(
    +		@Nonnull CompletableFuture<String> localAddressFuture,
    +		@Nonnull GatewayRetriever<? extends T> leaderRetriever,
    +		@Nonnull Time timeout,
    +		@Nonnull WebSocketSpecification<M, O, I> specification) {
    +		this.redirectHandler = new RedirectHandler<>(localAddressFuture, leaderRetriever,
timeout);
    +		this.gatewayAttr = AttributeKey.valueOf("gateway");
    +		this.specification = specification;
    +		this.messageCodec = new JsonWebSocketMessageCodec<>(specification.getInboundClass(),
specification.getOutboundClass());
    +		this.parametersAttr = AttributeKey.valueOf("parameters");
    +	}
    +
    +	/**
    +	 * Sets the gateway associated with the channel.
    +	 */
    +	private void setGateway(ChannelHandlerContext ctx, T gateway) {
    +		ctx.attr(gatewayAttr).set(gateway);
    +	}
    +
    +	/**
    +	 * Returns the gateway associated with the channel.
    +	 */
    +	public T getGateway(ChannelHandlerContext ctx) {
    +		T t = ctx.attr(gatewayAttr).get();
    +		Preconditions.checkState(t != null, "Gateway is not available.");
    +		return t;
    +	}
    +
    +	/**
    +	 * Sets the resource parameters associated with the channel.
    +	 *
    +	 * <p>The parameters are established by the WebSocket handshake request.
    +	 */
    +	private void setMessageParameters(ChannelHandlerContext ctx, M parameters) {
    +		ctx.attr(parametersAttr).set(parameters);
    +	}
    +
    +	/**
    +	 * Returns the resource parameters associated with the channel.
    +	 */
    +	public M getMessageParameters(ChannelHandlerContext ctx) {
    +		M o = ctx.attr(parametersAttr).get();
    +		Preconditions.checkState(o != null, "Message parameters are not available.");
    +		return o;
    +	}
    +
    +	@Override
    +	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    +		if (ctx.pipeline().get(RedirectHandler.class.getName()) == null) {
    +			ctx.pipeline().addBefore(ctx.name(), RedirectHandler.class.getName(), redirectHandler);
    +		}
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
{
    +		if (evt instanceof RedirectHandler.GatewayRetrieved) {
    +			T gateway = ((RedirectHandler.GatewayRetrieved<T>) evt).getGateway();
    +			setGateway(ctx, gateway);
    +			log.debug("Gateway retrieved: {}", gateway);
    +		}
    +		else if (evt instanceof WebSocketServerProtocolHandler.ServerHandshakeStateEvent) {
    +			WebSocketServerProtocolHandler.ServerHandshakeStateEvent handshakeEvent = (WebSocketServerProtocolHandler.ServerHandshakeStateEvent)
evt;
    +			if (handshakeEvent == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE)
{
    +				log.debug("Handshake completed with client IP: {}", ctx.channel().remoteAddress());
    +				M parameters = getMessageParameters(ctx);
    +				handshakeCompleted(ctx, parameters);
    +			}
    +		}
    +
    +		super.userEventTriggered(ctx, evt);
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public void channelRead(ChannelHandlerContext ctx, Object o) throws Exception {
    +		if (specification.getInboundClass().isAssignableFrom(o.getClass())) {
    +			// process an inbound message
    +			M parameters = getMessageParameters(ctx);
    +			messageReceived(ctx, parameters, (O) o);
    +			return;
    +		}
    +
    +		if (!(o instanceof Routed)) {
    +			// a foreign message
    +			ctx.fireChannelRead(o);
    +			return;
    +		}
    +
    +		// process an inbound HTTP request
    +		Routed request = (Routed) o;
    +
    +		// parse the REST request parameters
    +		M messageParameters = specification.getUnresolvedMessageParameters();
    +		try {
    +			messageParameters.resolveParameters(request.pathParams(), request.queryParams());
    +			if (!messageParameters.isResolved()) {
    +				throw new IllegalArgumentException("One or more mandatory parameters is missing.");
    +			}
    +		}
    +		catch (IllegalArgumentException e) {
    +			HandlerUtils.sendErrorResponse(
    +				ctx,
    +				request.request(),
    +				new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s",
e.getMessage())),
    +				HttpResponseStatus.BAD_REQUEST);
    +			ReferenceCountUtil.release(request);
    +			return;
    +		}
    +		setMessageParameters(ctx, messageParameters);
    +
    +		// validate the inbound handshake request with the subclass
    +		CompletableFuture<Void> handshakeReady;
    +		try {
    +			handshakeReady = handshakeInitiated(ctx, messageParameters);
    +		} catch (Exception e) {
    +			handshakeReady = FutureUtils.completedExceptionally(e);
    +		}
    +		handshakeReady.whenCompleteAsync((Void v, Throwable throwable) -> {
    +			try {
    +				if (throwable != null) {
    +					Throwable error = ExceptionUtils.stripCompletionException(throwable);
    +					if (error instanceof RestHandlerException) {
    +						final RestHandlerException rhe = (RestHandlerException) error;
    +						log.error("Exception occurred in REST handler.", error);
    +						HandlerUtils.sendErrorResponse(ctx, request.request(), new ErrorResponseBody(rhe.getMessage()),
rhe.getHttpResponseStatus());
    +					} else {
    +						log.error("Implementation error: Unhandled exception.", error);
    +						HandlerUtils.sendErrorResponse(ctx, request.request(), new ErrorResponseBody("Internal
server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
    +					}
    +				} else {
    +					upgradeToWebSocket(ctx, request);
    +				}
    +			}
    +			finally {
    +				ReferenceCountUtil.release(request);
    +			}
    +		}, ctx.executor());
    +	}
    +
    +	private void upgradeToWebSocket(final ChannelHandlerContext ctx, Routed msg) {
    +
    +		// store the context of the handler that precedes the current handler,
    +		// to use that context later to forward the HTTP request to the WebSocket protocol
handler
    +		String before = ctx.pipeline().names().get(ctx.pipeline().names().indexOf(ctx.name())
- 1);
    +		ChannelHandlerContext beforeCtx = ctx.pipeline().context(before);
    +
    +		// inject the websocket protocol handler into this channel, to be active
    +		// until the channel is closed.  note that the handshake may or may not complete synchronously.
    +		ctx.pipeline().addBefore(ctx.name(), WebSocketServerProtocolHandler.class.getName(),
    +			new WebSocketServerProtocolHandler(msg.path(), specification.getSubprotocol()));
    +
    +		// inject the message codec
    +		ctx.pipeline().addBefore(ctx.name(), messageCodec.getClass().getName(), messageCodec);
    +
    +		log.debug("Upgraded channel with WS protocol handler and message codec.");
    +
    +		// forward the message to the installed protocol handler to initiate handshaking
    +		HttpRequest request = msg.request();
    +		ReferenceCountUtil.retain(request);
    +		beforeCtx.fireChannelRead(request);
    +	}
    +
    +	@Override
    +	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
    +		log.error("WebSocket channel error; closing the channel.", cause);
    +		ctx.close();
    +	}
    +
    +	/**
    +	 * Handles a client handshake request to open a WebSocket resource.  Returns a {@link
CompletableFuture} to complete handshaking.
    +	 *
    +	 * <p>Implementations may decide whether to throw {@link RestHandlerException}s
or fail the returned
    +	 * {@link CompletableFuture} with a {@link RestHandlerException}.
    +	 *
    +	 * <p>Failing the future with another exception type or throwing unchecked exceptions
is regarded as an
    +	 * implementation error as it does not allow us to provide a meaningful HTTP status
code. In this case a
    +	 * {@link HttpResponseStatus#INTERNAL_SERVER_ERROR} will be returned.
    +	 *
    +	 * @param parameters the REST parameters
    +
    +	 * @return future indicating completion of handshake pre-processing.
    +	 * @throws RestHandlerException to produce a pre-formatted HTTP error response.
    +	 */
    +	protected CompletableFuture<Void> handshakeInitiated(ChannelHandlerContext ctx,
M parameters) throws Exception {
    +		return CompletableFuture.completedFuture(null);
    +	}
    +
    +	/**
    +	 * Invoked when the current channel has completed the handshaking to establish a WebSocket
connection.
    +	 *
    +	 * @param ctx the channel handler context
    +	 * @param parameters the REST parameters
    +	 * @throws Exception if processing failed.
    +	 */
    +	protected void handshakeCompleted(ChannelHandlerContext ctx, M parameters) throws Exception
{
    +	}
    +
    +	/**
    +	 * Invoked when the current channel has received a WebSocket message.
    +	 *
    +	 * <p>Be sure to release the message object (default behavior).
    +	 *
    +	 * @param ctx the channel handler context
    +	 * @param parameters the REST parameters
    +	 * @param msg the message received
    +	 * @throws Exception if the message could not be processed.
    +	 */
    +	protected void messageReceived(ChannelHandlerContext ctx, M parameters, O msg) throws
Exception {
    +		ReferenceCountUtil.release(msg);
    --- End diff --
    
    I think I will change the behavior to be more consistent with `SimpleChannelInboundHandler`
and auto-release the message rather than forcing the subclass to release it.  Note that `channelRead0`
is being renamed to `messageReceived` in Netty 5.


> Create WebSocket handler (server)
> ---------------------------------
>
>                 Key: FLINK-7738
>                 URL: https://issues.apache.org/jira/browse/FLINK-7738
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management, Mesos
>            Reporter: Eron Wright 
>            Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message