hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [50/50] [abbrv] hadoop git commit: HDFS-8515. Implement HTTP/2 stream channels. Contributed by Duo Zhang.
Date Thu, 25 Jun 2015 01:08:23 GMT
HDFS-8515. Implement HTTP/2 stream channels. Contributed by Duo Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05643009
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05643009
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05643009

Branch: refs/heads/HDFS-7966
Commit: 0564300915f37dbf8ecb5b4c06feeae9fd312b24
Parents: a815cc1
Author: Haohui Mai <wheat9@apache.org>
Authored: Wed Jun 24 18:07:17 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Wed Jun 24 18:07:17 2015 -0700

----------------------------------------------------------------------
 .gitignore                                      |   2 +
 .../hadoop-hdfs/CHANGES-HDFS-7966.txt           |   3 +
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |   5 +
 .../web/PortUnificationServerHandler.java       |  27 +-
 .../datanode/web/dtp/DtpChannelHandler.java     |  47 ++++
 .../datanode/web/dtp/DtpHttp2FrameListener.java |  52 ----
 .../datanode/web/dtp/DtpHttp2Handler.java       |  34 ---
 .../hdfs/web/http2/Http2StreamChannel.java      | 268 +++++++++++++++++++
 .../hadoop/hdfs/web/http2/LastHttp2Message.java |  44 +++
 .../web/http2/ServerHttp2ConnectionHandler.java |  86 ++++++
 .../web/http2/ServerHttp2EventListener.java     | 135 ++++++++++
 .../datanode/web/dtp/Http2ResponseHandler.java  |  14 +-
 .../server/datanode/web/dtp/TestDtpHttp2.java   |   6 +-
 .../hdfs/web/http2/AbstractTestHttp2Server.java |  67 +++++
 .../hadoop/hdfs/web/http2/StreamListener.java   | 116 ++++++++
 .../hadoop/hdfs/web/http2/TestHttp2Server.java  | 140 ++++++++++
 .../web/http2/TestHttp2ServerMultiThread.java   | 207 ++++++++++++++
 hadoop-project/pom.xml                          |   7 +
 18 files changed, 1154 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index cde198e..0d664ba 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,5 @@ yarnregistry.pdf
 hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml
 hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml
 patchprocess/
+hadoop-hdfs-project/hadoop-hdfs/src/test/resources/common-version-info.properties
+hadoop-hdfs-project/hadoop-hdfs/src/test/resources/webapps

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt
new file mode 100644
index 0000000..4ec2793
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt
@@ -0,0 +1,3 @@
+HDFS-7966 AND RELATED CHANGES
+
+  HDFS-8515. Implement HTTP/2 stream channels. (Duo Zhang via wheat9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index f90644c..2d02c46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -196,6 +196,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>htrace-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty.http2</groupId>
+      <artifactId>http2-client</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-kms</artifactId>
       <classifier>classes</classifier>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
index 7ebc070..e5c5256 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
@@ -17,21 +17,24 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.web;
 
-import java.net.InetSocketAddress;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.datanode.web.dtp.DtpHttp2Handler;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.http.HttpServerCodec;
 import io.netty.handler.codec.http2.Http2CodecUtil;
 import io.netty.handler.stream.ChunkedWriteHandler;
 
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.web.dtp.DtpChannelHandler;
+import org.apache.hadoop.hdfs.web.http2.Http2StreamChannel;
+import org.apache.hadoop.hdfs.web.http2.ServerHttp2ConnectionHandler;
+
 /**
  * A port unification handler to support HTTP/1.1 and HTTP/2 on the same port.
  */
@@ -64,7 +67,15 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
   }
 
   private void configureHttp2(ChannelHandlerContext ctx) {
-    ctx.pipeline().addLast(new DtpHttp2Handler());
+    ctx.pipeline().addLast(
+      ServerHttp2ConnectionHandler.create(ctx.channel(),
+        new ChannelInitializer<Http2StreamChannel>() {
+
+          @Override
+          protected void initChannel(Http2StreamChannel ch) throws Exception {
+            ch.pipeline().addLast(new DtpChannelHandler());
+          }
+        }));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java
new file mode 100644
index 0000000..23847c1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.web.dtp;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.Http2Headers;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.web.http2.LastHttp2Message;
+
+/**
+ * A dummy handler that just write back a string message.
+ */
+@InterfaceAudience.Private
+public class DtpChannelHandler extends
+    SimpleChannelInboundHandler<Http2Headers> {
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, Http2Headers msg)
+      throws Exception {
+    ctx.write(new DefaultHttp2Headers().status(HttpResponseStatus.OK
+        .codeAsText()));
+    ctx.write(ctx.alloc().buffer()
+        .writeBytes("HTTP/2 DTP".getBytes(StandardCharsets.UTF_8)));
+    ctx.writeAndFlush(LastHttp2Message.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java
deleted file mode 100644
index 41e7cf4..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.web.dtp;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.Http2ConnectionEncoder;
-import io.netty.handler.codec.http2.Http2Exception;
-import io.netty.handler.codec.http2.Http2FrameAdapter;
-import io.netty.handler.codec.http2.Http2Headers;
-
-import java.nio.charset.StandardCharsets;
-
-class DtpHttp2FrameListener extends Http2FrameAdapter {
-
-  private Http2ConnectionEncoder encoder;
-
-  public void encoder(Http2ConnectionEncoder encoder) {
-    this.encoder = encoder;
-  }
-
-  @Override
-  public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
-      Http2Headers headers, int streamDependency, short weight,
-      boolean exclusive, int padding, boolean endStream) throws Http2Exception {
-    encoder.writeHeaders(ctx, streamId,
-      new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText()), 0,
-      false, ctx.newPromise());
-    encoder.writeData(
-      ctx,
-      streamId,
-      ctx.alloc().buffer()
-          .writeBytes("HTTP/2 DTP".getBytes(StandardCharsets.UTF_8)), 0, true,
-      ctx.newPromise());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java
deleted file mode 100644
index 5b6f279..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.web.dtp;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import io.netty.handler.codec.http2.Http2ConnectionHandler;
-
-/**
- * The HTTP/2 handler.
- */
-@InterfaceAudience.Private
-public class DtpHttp2Handler extends Http2ConnectionHandler {
-
-  public DtpHttp2Handler() {
-    super(true, new DtpHttp2FrameListener());
-    ((DtpHttp2FrameListener) decoder().listener()).encoder(encoder());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
new file mode 100644
index 0000000..658ffe4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelOutboundBuffer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelConfig;
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.UnsupportedMessageTypeException;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.InternalThreadLocalMap;
+
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * A channel used for modeling an HTTP/2 stream.
+ * <p>
+ * We share the same event loop with the parent channel, so doBeginRead, doWrite
+ * and doClose will run in the same event loop thread. So no event loop
+ * switching is needed, and it is safe to call encoder.writeXXX directly in
+ * doWrite.
+ * <p>
+ * But the public methods(isOpen, isActive...) can be called outside the event
+ * loop, so the state field must be volatile.
+ */
+@InterfaceAudience.Private
+public class Http2StreamChannel extends AbstractChannel {
+
+  private static final ChannelMetadata METADATA = new ChannelMetadata(false);
+
+  private static final int MAX_READER_STACK_DEPTH = 8;
+
+  private final ChannelHandlerContext http2ConnHandlerCtx;
+  private final Http2Stream stream;
+  private final Http2ConnectionEncoder encoder;
+  private final DefaultChannelConfig config;
+  private final Queue<Object> inboundMessageQueue = new ArrayDeque<>();
+
+  private enum State {
+    OPEN, HALF_CLOSED_LOCAL, HALF_CLOSED_REMOTE, PRE_CLOSED, CLOSED
+  }
+
+  private volatile State state = State.OPEN;
+
+  public Http2StreamChannel(Channel parent, Http2Stream stream) {
+    super(parent);
+    this.http2ConnHandlerCtx =
+        parent.pipeline().context(Http2ConnectionHandler.class);
+    Http2ConnectionHandler connHandler =
+        (Http2ConnectionHandler) http2ConnHandlerCtx.handler();
+    this.stream = stream;
+    this.encoder = connHandler.encoder();
+    this.config = new DefaultChannelConfig(this);
+  }
+
+  @Override
+  public ChannelConfig config() {
+    return config;
+  }
+
+  @Override
+  public boolean isOpen() {
+    return state != State.CLOSED;
+  }
+
+  @Override
+  public boolean isActive() {
+    // we create this channel after HTTP/2 stream active, so we do not have a
+    // separated 'active' state.
+    return isOpen();
+  }
+
+  @Override
+  public ChannelMetadata metadata() {
+    return METADATA;
+  }
+
+  private final class Http2Unsafe extends AbstractUnsafe {
+
+    @Override
+    public void connect(SocketAddress remoteAddress,
+        SocketAddress localAddress, ChannelPromise promise) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  protected AbstractUnsafe newUnsafe() {
+    return new Http2Unsafe();
+  }
+
+  @Override
+  protected boolean isCompatible(EventLoop loop) {
+    return true;
+  }
+
+  @Override
+  protected SocketAddress localAddress0() {
+    return parent().localAddress();
+  }
+
+  @Override
+  protected SocketAddress remoteAddress0() {
+    return parent().remoteAddress();
+  }
+
+  @Override
+  protected void doBind(SocketAddress localAddress) throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void doDisconnect() throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void doClose() throws Exception {
+    if (stream.state() != Http2Stream.State.CLOSED) {
+      encoder.writeRstStream(http2ConnHandlerCtx, stream.id(),
+        Http2Error.INTERNAL_ERROR.code(), http2ConnHandlerCtx.newPromise());
+    }
+    state = State.CLOSED;
+  }
+
+  private final Runnable readTask = new Runnable() {
+
+    @Override
+    public void run() {
+      ChannelPipeline pipeline = pipeline();
+      int maxMessagesPerRead = config().getMaxMessagesPerRead();
+      for (int i = 0; i < maxMessagesPerRead; i++) {
+        Object m = inboundMessageQueue.poll();
+        if (m == null) {
+          break;
+        }
+        if (m == LastHttp2Message.get()) {
+          state =
+              state == State.HALF_CLOSED_LOCAL ? State.PRE_CLOSED
+                  : State.HALF_CLOSED_REMOTE;
+        }
+        pipeline.fireChannelRead(m);
+      }
+      pipeline.fireChannelReadComplete();
+    }
+  };
+
+  @Override
+  protected void doBeginRead() throws Exception {
+    State currentState = this.state;
+    if (currentState == State.CLOSED) {
+      throw new ClosedChannelException();
+    }
+    if (inboundMessageQueue.isEmpty()) {
+      return;
+    }
+    final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
+    final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
+    if (stackDepth < MAX_READER_STACK_DEPTH) {
+      threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
+      try {
+        readTask.run();
+      } finally {
+        threadLocals.setLocalChannelReaderStackDepth(stackDepth);
+      }
+    } else {
+      eventLoop().execute(readTask);
+    }
+  }
+
+  @Override
+  protected void doWrite(ChannelOutboundBuffer in) throws Exception {
+    State currentState = this.state;
+    if (currentState == State.CLOSED) {
+      throw new ClosedChannelException();
+    }
+    boolean flush = false;
+    for (;;) {
+      Object msg = in.current();
+      if (msg == null) {
+        break;
+      }
+      if (msg == LastHttp2Message.get()) {
+        this.state =
+            currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED
+                : State.HALF_CLOSED_LOCAL;
+        encoder.writeData(http2ConnHandlerCtx, stream.id(), http2ConnHandlerCtx
+            .alloc().buffer(0), 0, true, http2ConnHandlerCtx.newPromise());
+      } else if (msg instanceof Http2Headers) {
+        encoder.writeHeaders(http2ConnHandlerCtx, stream.id(),
+          (Http2Headers) msg, 0, false, http2ConnHandlerCtx.newPromise());
+      } else if (msg instanceof ByteBuf) {
+        ByteBuf data = (ByteBuf) msg;
+        encoder.writeData(http2ConnHandlerCtx, stream.id(), data.retain(), 0,
+          false, http2ConnHandlerCtx.newPromise());
+      } else {
+        throw new UnsupportedMessageTypeException(msg, Http2Headers.class,
+            ByteBuf.class);
+      }
+      in.remove();
+      flush = true;
+    }
+    if (flush) {
+      http2ConnHandlerCtx.channel().flush();
+    }
+  }
+
+  /**
+   * Append a message to the inbound queue of this channel. You need to call
+   * {@link #read()} if you want to pass the message to handlers.
+   */
+  void writeInbound(Object msg) {
+    inboundMessageQueue.add(msg);
+  }
+
+  private static final ImmutableSet<State> REMOTE_SIDE_CLOSED_STATES =
+      ImmutableSet.of(State.HALF_CLOSED_REMOTE, State.PRE_CLOSED, State.CLOSED);
+
+  /**
+   * @return true if remote side finishes sending data to us.
+   */
+  public boolean remoteSideClosed() {
+    return REMOTE_SIDE_CLOSED_STATES.contains(state);
+  }
+
+  private static final ImmutableSet<State> LOCAL_SIDE_CLOSED_STATES =
+      ImmutableSet.of(State.HALF_CLOSED_LOCAL, State.PRE_CLOSED, State.CLOSED);
+
+  /**
+   * @return true if we finish sending data to remote side.
+   */
+  public boolean localSideClosed() {
+    return LOCAL_SIDE_CLOSED_STATES.contains(state);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java
new file mode 100644
index 0000000..b72b09a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.handler.codec.http.LastHttpContent;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used to tell an inbound handler that the remote side of an HTTP/2 stream is
+ * closed, or used by an outbound handler to tell the HTTP/2 stream to close
+ * local side.
+ * @see LastHttpContent#EMPTY_LAST_CONTENT
+ */
+@InterfaceAudience.Private
+public final class LastHttp2Message {
+
+  private static final LastHttp2Message INSTANCE = new LastHttp2Message();
+
+  private LastHttp2Message() {
+  }
+
+  /**
+   * Get the singleton <tt>LastHttp2Message</tt> instance.
+   */
+  public static LastHttp2Message get() {
+    return INSTANCE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
new file mode 100644
index 0000000..1ee733d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
+import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2FrameListener;
+import io.netty.handler.codec.http2.Http2FrameLogger;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2InboundFrameLogger;
+import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
+import io.netty.handler.logging.LogLevel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An {@link Http2ConnectionHandler} used at server side.
+ */
+@InterfaceAudience.Private
+public class ServerHttp2ConnectionHandler extends Http2ConnectionHandler {
+
+  private static final Log LOG = LogFactory
+      .getLog(ServerHttp2ConnectionHandler.class);
+
+  private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
+      LogLevel.INFO, ServerHttp2ConnectionHandler.class);
+
+  private ServerHttp2ConnectionHandler(Http2Connection connection,
+      Http2FrameReader frameReader, Http2FrameWriter frameWriter,
+      Http2FrameListener listener) {
+    super(connection, frameReader, frameWriter, listener);
+  }
+
+  /**
+   * Create and initialize an {@link ServerHttp2ConnectionHandler}.
+   * @param channel
+   * @param initializer
+   * @param verbose whether to log inbound and outbound HTTP/2 messages
+   * @return the initialized {@link ServerHttp2ConnectionHandler}
+   */
+  public static ServerHttp2ConnectionHandler create(Channel channel,
+      ChannelInitializer<Http2StreamChannel> initializer) {
+    Http2Connection conn = new DefaultHttp2Connection(true);
+    ServerHttp2EventListener listener =
+        new ServerHttp2EventListener(channel, conn, initializer);
+    conn.addListener(listener);
+    Http2FrameReader frameReader;
+    Http2FrameWriter frameWriter;
+    if (LOG.isDebugEnabled()) {
+      frameReader =
+          new Http2InboundFrameLogger(new DefaultHttp2FrameReader(),
+              FRAME_LOGGER);
+      frameWriter =
+          new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(),
+              FRAME_LOGGER);
+    } else {
+      frameReader = new DefaultHttp2FrameReader();
+      frameWriter = new DefaultHttp2FrameWriter();
+    }
+    return new ServerHttp2ConnectionHandler(conn, frameReader, frameWriter,
+        listener);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
new file mode 100644
index 0000000..72e3879
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2EventAdapter;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An HTTP/2 FrameListener and EventListener to manage
+ * {@link Http2StreamChannel}s.
+ * <p>
+ * We do not handle onRstStreamRead here, a stream that being reset will also
+ * call onStreamClosed. The upper layer should not rely on a reset event.
+ */
+@InterfaceAudience.Private
+public class ServerHttp2EventListener extends Http2EventAdapter {
+
+  private final Channel parentChannel;
+
+  private final ChannelInitializer<Http2StreamChannel> subChannelInitializer;
+
+  private final Http2Connection conn;
+
+  private final PropertyKey subChannelPropKey;
+
+  public ServerHttp2EventListener(Channel parentChannel, Http2Connection conn,
+      ChannelInitializer<Http2StreamChannel> subChannelInitializer) {
+    this.parentChannel = parentChannel;
+    this.conn = conn;
+    this.subChannelInitializer = subChannelInitializer;
+    this.subChannelPropKey = conn.newKey();
+  }
+
+  @Override
+  public void onStreamActive(final Http2Stream stream) {
+    Http2StreamChannel subChannel =
+        new Http2StreamChannel(parentChannel, stream);
+    stream.setProperty(subChannelPropKey, subChannel);
+    subChannel.pipeline().addFirst(subChannelInitializer);
+    parentChannel.eventLoop().register(subChannel)
+        .addListener(new FutureListener<Void>() {
+
+          @Override
+          public void operationComplete(Future<Void> future) throws Exception {
+            if (!future.isSuccess()) {
+              stream.removeProperty(subChannelPropKey);
+            }
+          }
+
+        });
+
+  }
+
+  @Override
+  public void onStreamClosed(Http2Stream stream) {
+    Http2StreamChannel subChannel = stream.removeProperty(subChannelPropKey);
+    if (subChannel != null) {
+      subChannel.close();
+    }
+  }
+
+  private Http2StreamChannel getSubChannel(int streamId) throws Http2Exception {
+    Http2StreamChannel subChannel =
+        conn.stream(streamId).getProperty(subChannelPropKey);
+    if (subChannel == null) {
+      throw Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR,
+        "No sub channel found");
+    }
+    return subChannel;
+  }
+
+  private void writeInbound(int streamId, Object msg, boolean endOfStream)
+      throws Http2Exception {
+    Http2StreamChannel subChannel = getSubChannel(streamId);
+    subChannel.writeInbound(msg);
+    if (endOfStream) {
+      subChannel.writeInbound(LastHttp2Message.get());
+    }
+    if (subChannel.config().isAutoRead()) {
+      subChannel.read();
+    }
+
+  }
+
+  @Override
+  public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
+      Http2Headers headers, int padding, boolean endOfStream)
+      throws Http2Exception {
+    writeInbound(streamId, headers, endOfStream);
+  }
+
+  @Override
+  public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
+      Http2Headers headers, int streamDependency, short weight,
+      boolean exclusive, int padding, boolean endOfStream)
+      throws Http2Exception {
+    onHeadersRead(ctx, streamId, headers, padding, endOfStream);
+  }
+
+  @Override
+  public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
+      int padding, boolean endOfStream) throws Http2Exception {
+    int pendingBytes = data.readableBytes() + padding;
+    writeInbound(streamId, data.retain(), endOfStream);
+    return pendingBytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
index eb8b918..1e1acdd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
@@ -23,14 +23,15 @@ import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http2.HttpUtil;
 import io.netty.util.concurrent.Promise;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import net.sf.ehcache.store.chm.ConcurrentHashMap;
 
 public class Http2ResponseHandler extends
     SimpleChannelInboundHandler<FullHttpResponse> {
 
-  private Map<Integer, Promise<FullHttpResponse>> streamId2Promise =
-      new HashMap<>();
+  private ConcurrentMap<Integer, Promise<FullHttpResponse>> streamId2Promise =
+      new ConcurrentHashMap<>();
 
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg)
@@ -46,10 +47,7 @@ public class Http2ResponseHandler extends
       // this is the upgrade response message, just ignore it.
       return;
     }
-    Promise<FullHttpResponse> promise;
-    synchronized (this) {
-      promise = streamId2Promise.get(streamId);
-    }
+    Promise<FullHttpResponse> promise = streamId2Promise.get(streamId);
     if (promise == null) {
       System.err.println("Message received for unknown stream id " + streamId);
     } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
index 4e91004..eaa63a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
@@ -136,10 +136,8 @@ public class TestDtpHttp2 {
     request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(),
       streamId);
     Promise<FullHttpResponse> promise = CHANNEL.eventLoop().newPromise();
-    synchronized (RESPONSE_HANDLER) {
-      CHANNEL.writeAndFlush(request);
-      RESPONSE_HANDLER.put(streamId, promise);
-    }
+    RESPONSE_HANDLER.put(streamId, promise);
+    CHANNEL.writeAndFlush(request);
     assertEquals(HttpResponseStatus.OK, promise.get().status());
     ByteBuf content = promise.get().content();
     assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java
new file mode 100644
index 0000000..5f298f5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import java.net.InetSocketAddress;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import org.eclipse.jetty.http2.ErrorCode;
+import org.eclipse.jetty.http2.api.Session;
+import org.eclipse.jetty.http2.client.HTTP2Client;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.FuturePromise;
+
+public abstract class AbstractTestHttp2Server {
+
+  protected EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+
+  protected EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+  protected Channel server;
+
+  protected HTTP2Client client = new HTTP2Client();
+
+  protected Session session;
+
+  protected abstract Channel initServer();
+
+  protected final void start() throws Exception {
+    server = initServer();
+    client.start();
+    int port = ((InetSocketAddress) server.localAddress()).getPort();
+    FuturePromise<Session> sessionPromise = new FuturePromise<>();
+    client.connect(new InetSocketAddress("127.0.0.1", port),
+      new Session.Listener.Adapter(), sessionPromise);
+    session = sessionPromise.get();
+  }
+
+  protected final void stop() throws Exception {
+    if (session != null) {
+      session.close(ErrorCode.NO_ERROR.code, "", new Callback.Adapter());
+    }
+    if (server != null) {
+      server.close();
+    }
+    client.stop();
+    bossGroup.shutdownGracefully();
+    workerGroup.shutdownGracefully();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java
new file mode 100644
index 0000000..7194490
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http.MetaData.Response;
+import org.eclipse.jetty.http2.api.Stream;
+import org.eclipse.jetty.http2.frames.DataFrame;
+import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.ResetFrame;
+import org.eclipse.jetty.util.Callback;
+
+public class StreamListener extends Stream.Listener.Adapter {
+
+  private boolean finish = false;
+
+  private byte[] buf = new byte[0];
+
+  private int status = -1;
+
+  private boolean reset;
+
+  @Override
+  public void onData(Stream stream, DataFrame frame, Callback callback) {
+    synchronized (this) {
+      if (reset) {
+        callback.failed(new IllegalStateException("Stream already closed"));
+      }
+      if (status == -1) {
+        callback
+            .failed(new IllegalStateException("Haven't received header yet"));
+      }
+      int bufLen = buf.length;
+      int newBufLen = bufLen + frame.getData().remaining();
+      buf = Arrays.copyOf(buf, newBufLen);
+      frame.getData().get(buf, bufLen, frame.getData().remaining());
+      if (frame.isEndStream()) {
+        finish = true;
+      }
+      notifyAll();
+      callback.succeeded();
+    }
+  }
+
+  @Override
+  public void onHeaders(Stream stream, HeadersFrame frame) {
+    synchronized (this) {
+      if (reset) {
+        throw new IllegalStateException("Stream already closed");
+      }
+      if (status != -1) {
+        throw new IllegalStateException("Header already received");
+      }
+      MetaData meta = frame.getMetaData();
+      if (!meta.isResponse()) {
+        throw new IllegalStateException("Received non-response header");
+      }
+      status = ((Response) meta).getStatus();
+      if (frame.isEndStream()) {
+        finish = true;
+        notifyAll();
+      }
+    }
+  }
+
+  @Override
+  public void onReset(Stream stream, ResetFrame frame) {
+    synchronized (this) {
+      reset = true;
+      finish = true;
+      notifyAll();
+    }
+  }
+
+  public int getStatus() throws InterruptedException, IOException {
+    synchronized (this) {
+      while (!finish) {
+        wait();
+      }
+      if (reset) {
+        throw new IOException("Stream reset");
+      }
+      return status;
+    }
+  }
+
+  public byte[] getData() throws InterruptedException, IOException {
+    synchronized (this) {
+      while (!finish) {
+        wait();
+      }
+      if (reset) {
+        throw new IOException("Stream reset");
+      }
+      return buf;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
new file mode 100644
index 0000000..6a8495b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http2.ErrorCode;
+import org.eclipse.jetty.http2.api.Stream;
+import org.eclipse.jetty.http2.frames.DataFrame;
+import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PriorityFrame;
+import org.eclipse.jetty.http2.frames.ResetFrame;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.FuturePromise;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHttp2Server extends AbstractTestHttp2Server {
+
+  private final AtomicInteger handlerClosedCount = new AtomicInteger(0);
+
+  private final class HelloWorldHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+      if (msg instanceof Http2Headers) {
+        ctx.writeAndFlush(new DefaultHttp2Headers()
+            .status(HttpResponseStatus.OK.codeAsText()));
+      } else {
+        ctx.writeAndFlush(ReferenceCountUtil.retain(msg));
+      }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      handlerClosedCount.incrementAndGet();
+    }
+
+  }
+
+  @Override
+  protected Channel initServer() {
+    return new ServerBootstrap().group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .childHandler(new ChannelInitializer<Channel>() {
+
+          @Override
+          protected void initChannel(Channel ch) throws Exception {
+            ch.pipeline().addLast(
+              ServerHttp2ConnectionHandler.create(ch,
+                new ChannelInitializer<Http2StreamChannel>() {
+
+                  @Override
+                  protected void initChannel(Http2StreamChannel ch)
+                      throws Exception {
+                    ch.pipeline().addLast(new HelloWorldHandler());
+                  }
+                }));
+          }
+
+        }).bind(0).syncUninterruptibly().channel();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stop();
+  }
+
+  @Test
+  public void test() throws InterruptedException, ExecutionException,
+      IOException {
+    HttpFields fields = new HttpFields();
+    fields.put(HttpHeader.C_METHOD, HttpMethod.GET.asString());
+    fields.put(HttpHeader.C_PATH, "/");
+    FuturePromise<Stream> streamPromise = new FuturePromise<>();
+    StreamListener listener = new StreamListener();
+    session.newStream(new HeadersFrame(1, new MetaData(
+        org.eclipse.jetty.http.HttpVersion.HTTP_2, fields), new PriorityFrame(
+        1, 0, 1, false), false), streamPromise, listener);
+    Stream stream = streamPromise.get();
+    stream.data(
+      new DataFrame(stream.getId(), ByteBuffer.wrap("Hello World"
+          .getBytes(StandardCharsets.UTF_8)), true), new Callback.Adapter());
+    assertEquals("Hello World", new String(listener.getData(),
+        StandardCharsets.UTF_8));
+
+    streamPromise = new FuturePromise<>();
+    listener = new StreamListener();
+    session.newStream(new HeadersFrame(1, new MetaData(
+        org.eclipse.jetty.http.HttpVersion.HTTP_2, fields), new PriorityFrame(
+        1, 0, 1, false), false), streamPromise, listener);
+    stream = streamPromise.get();
+    stream.reset(new ResetFrame(stream.getId(), ErrorCode.NO_ERROR.code),
+      new Callback.Adapter());
+    Thread.sleep(1000);
+    assertEquals(2, handlerClosedCount.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
new file mode 100644
index 0000000..e583ca3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetector.Level;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http2.ErrorCode;
+import org.eclipse.jetty.http2.api.Stream;
+import org.eclipse.jetty.http2.frames.DataFrame;
+import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PriorityFrame;
+import org.eclipse.jetty.http2.frames.ResetFrame;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.FuturePromise;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TestHttp2ServerMultiThread extends AbstractTestHttp2Server {
+
+  private final class DispatchHandler extends
+      SimpleChannelInboundHandler<Http2Headers> {
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Http2Headers msg)
+        throws Exception {
+      ctx.writeAndFlush(new DefaultHttp2Headers().status(HttpResponseStatus.OK
+          .codeAsText()));
+      ctx.pipeline().remove(this)
+          .addLast(new EchoHandler(), new EndStreamHandler());
+    }
+  }
+
+  private final class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
+        throws Exception {
+      ByteBuf out = msg.readBytes(msg.readableBytes());
+      ctx.writeAndFlush(out);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      handlerClosedCount.incrementAndGet();
+    }
+
+  }
+
+  private final class EndStreamHandler extends
+      SimpleChannelInboundHandler<LastHttp2Message> {
+
+    @Override
+    protected void
+        channelRead0(ChannelHandlerContext ctx, LastHttp2Message msg)
+            throws Exception {
+      ctx.writeAndFlush(msg);
+    }
+
+  }
+
+  private final AtomicInteger handlerClosedCount = new AtomicInteger(0);
+
+  private int concurrency = 10;
+
+  private ExecutorService executor = Executors.newFixedThreadPool(concurrency,
+    new ThreadFactoryBuilder().setNameFormat("Echo-Client-%d").setDaemon(true)
+        .build());
+
+  private int requestCount = 10000;
+
+  @Override
+  protected Channel initServer() {
+    return new ServerBootstrap().group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .childHandler(new ChannelInitializer<Channel>() {
+
+          @Override
+          protected void initChannel(Channel ch) throws Exception {
+            ch.pipeline().addLast(
+              ServerHttp2ConnectionHandler.create(ch,
+                new ChannelInitializer<Http2StreamChannel>() {
+
+                  @Override
+                  protected void initChannel(Http2StreamChannel ch)
+                      throws Exception {
+                    ch.pipeline().addLast(new DispatchHandler());
+                  }
+                }));
+          }
+
+        }).bind(0).syncUninterruptibly().channel();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    ResourceLeakDetector.setLevel(Level.ADVANCED);
+    start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    executor.shutdownNow();
+    stop();
+  }
+
+  private void testEcho() throws InterruptedException, ExecutionException,
+      IOException {
+    HttpFields fields = new HttpFields();
+    fields.put(HttpHeader.C_METHOD, HttpMethod.GET.asString());
+    fields.put(HttpHeader.C_PATH, "/");
+    FuturePromise<Stream> streamPromise = new FuturePromise<>();
+    StreamListener listener = new StreamListener();
+    session.newStream(new HeadersFrame(1, new MetaData(
+        org.eclipse.jetty.http.HttpVersion.HTTP_2, fields), new PriorityFrame(
+        1, 0, 1, false), false), streamPromise, listener);
+    Stream stream = streamPromise.get();
+    if (ThreadLocalRandom.current().nextInt(5) < 1) { // 20%
+      stream.reset(new ResetFrame(stream.getId(), ErrorCode.NO_ERROR.code),
+        new Callback.Adapter());
+    } else {
+      int numFrames = ThreadLocalRandom.current().nextInt(1, 3);
+      ByteArrayOutputStream msg = new ByteArrayOutputStream();
+      for (int i = 0; i < numFrames; i++) {
+        byte[] frame = new byte[ThreadLocalRandom.current().nextInt(10, 100)];
+        ThreadLocalRandom.current().nextBytes(frame);
+        stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(frame),
+            i == numFrames - 1), new Callback.Adapter());
+        msg.write(frame);
+      }
+      assertEquals(HttpStatus.OK_200, listener.getStatus());
+      assertArrayEquals(msg.toByteArray(), listener.getData());
+    }
+  }
+
+  @Test
+  public void test() throws InterruptedException {
+    final AtomicBoolean succ = new AtomicBoolean(true);
+    for (int i = 0; i < requestCount; i++) {
+      executor.execute(new Runnable() {
+
+        @Override
+        public void run() {
+          try {
+            testEcho();
+          } catch (Throwable t) {
+            t.printStackTrace();
+            succ.set(false);
+          }
+        }
+      });
+    }
+    executor.shutdown();
+    assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
+    assertTrue(succ.get());
+    Thread.sleep(1000);
+    assertEquals(requestCount, handlerClosedCount.get());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05643009/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 16d2058..981e0e2 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -583,6 +583,13 @@
       </dependency>
 
       <dependency>
+        <groupId>org.eclipse.jetty.http2</groupId>
+        <artifactId>http2-client</artifactId>
+        <version>9.3.0.M2</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
         <groupId>com.twitter</groupId>
         <artifactId>hpack</artifactId>
         <version>0.11.0</version>


Mime
View raw message