zipkin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adrianc...@apache.org
Subject [incubator-zipkin] branch master updated: Vendors in AbstractUnsafeUnaryGrpcService temporarily to remove dependency on armeria release. (#2593)
Date Wed, 15 May 2019 11:42:50 GMT
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git


The following commit(s) were added to refs/heads/master by this push:
     new daf95a8  Vendors in AbstractUnsafeUnaryGrpcService temporarily to remove dependency
on armeria release. (#2593)
daf95a8 is described below

commit daf95a8962ac7e7a6cbcc9c073a80eade2f71ef4
Author: Anuraag Agrawal <anuraaga@gmail.com>
AuthorDate: Wed May 15 20:42:46 2019 +0900

    Vendors in AbstractUnsafeUnaryGrpcService temporarily to remove dependency on armeria
release. (#2593)
---
 .../internal/AbstractUnsafeUnaryGrpcService.java   | 129 +++++++++++++++++++++
 .../server/internal/ZipkinGrpcCollector.java       |  30 +++--
 2 files changed, 148 insertions(+), 11 deletions(-)

diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/AbstractUnsafeUnaryGrpcService.java
b/zipkin-server/src/main/java/zipkin2/server/internal/AbstractUnsafeUnaryGrpcService.java
new file mode 100644
index 0000000..051ee7e
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/AbstractUnsafeUnaryGrpcService.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright 2019 LINE Corporation
+ *
+ * LINE Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package zipkin2.server.internal;
+
+import com.linecorp.armeria.common.HttpData;
+import com.linecorp.armeria.common.HttpHeaderNames;
+import com.linecorp.armeria.common.HttpHeadersBuilder;
+import com.linecorp.armeria.common.HttpRequest;
+import com.linecorp.armeria.common.HttpResponse;
+import com.linecorp.armeria.common.HttpStatus;
+import com.linecorp.armeria.common.ResponseHeaders;
+import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageDeframer;
+import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageDeframer.ByteBufOrStream;
+import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageDeframer.Listener;
+import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageFramer;
+import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException;
+import com.linecorp.armeria.common.grpc.protocol.GrpcHeaderNames;
+import com.linecorp.armeria.common.grpc.protocol.GrpcTrailersUtil;
+import com.linecorp.armeria.server.AbstractHttpService;
+import com.linecorp.armeria.server.ServiceRequestContext;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Temporarily copied from Armeria to allow using before it's released.
+ */
+abstract class AbstractUnsafeUnaryGrpcService extends AbstractHttpService {
+
+  private static final ResponseHeaders RESPONSE_HEADERS =
+    ResponseHeaders.of(HttpStatus.OK,
+      HttpHeaderNames.CONTENT_TYPE, "application/grpc+proto",
+      GrpcHeaderNames.GRPC_ENCODING, "identity");
+
+  /**
+   * Returns an unframed response message to return to the client, given an unframed request
message. It is
+   * expected that the implementation has the logic to know how to parse the request and
serialize a response
+   * into {@link ByteBuf}. The returned {@link ByteBuf} will be framed and returned to the
client.
+   */
+  protected abstract CompletableFuture<ByteBuf> handleMessage(ByteBuf message);
+
+  @Override
+  protected final HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) {
+    final CompletableFuture<HttpResponse> responseFuture =
+      req.aggregateWithPooledObjects(ctx.contextAwareEventLoop(), ctx.alloc())
+        .thenCompose(msg -> deframeMessage(msg.content(), ctx.alloc()))
+        .thenCompose(this::handleMessage)
+        .thenApply(responseMessage -> {
+          final ArmeriaMessageFramer framer = new ArmeriaMessageFramer(
+            ctx.alloc(), Integer.MAX_VALUE);
+          final HttpData framed = framer.writePayload(responseMessage);
+          return HttpResponse.of(
+            RESPONSE_HEADERS,
+            framed,
+            GrpcTrailersUtil.statusToTrailers(/* OK */ 0, null, true).build());
+        })
+        .exceptionally(t -> {
+          final HttpHeadersBuilder trailers;
+          if (t instanceof ArmeriaStatusException) {
+            ArmeriaStatusException statusException = (ArmeriaStatusException) t;
+            trailers = GrpcTrailersUtil.statusToTrailers(
+              statusException.getCode(), statusException.getMessage(), false);
+          } else {
+            trailers = GrpcTrailersUtil.statusToTrailers(
+              /* INTERNAL */ 13, t.getMessage(), false);
+          }
+          return HttpResponse.of(trailers.build());
+        });
+
+    return HttpResponse.from(responseFuture);
+  }
+
+  private CompletableFuture<ByteBuf> deframeMessage(HttpData framed, ByteBufAllocator
alloc) {
+    final CompletableFuture<ByteBuf> deframed = new CompletableFuture<>();
+    try (ArmeriaMessageDeframer deframer = new ArmeriaMessageDeframer(
+      new Listener() {
+        @Override
+        public void messageRead(ByteBufOrStream message) {
+          // Compression not supported.
+          assert message.buf() != null;
+          deframed.complete(message.buf());
+        }
+
+        @Override
+        public void endOfStream() {
+          if (!deframed.isDone()) {
+            deframed.complete(Unpooled.EMPTY_BUFFER);
+          }
+        }
+      },
+      Integer.MAX_VALUE,
+      alloc)) {
+      deframer.request(1);
+      deframer.deframe(framed, true);
+    }
+    return deframed;
+  }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
index 9ae9a94..a0918ce 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
@@ -16,12 +16,15 @@
  */
 package zipkin2.server.internal;
 
-import com.linecorp.armeria.common.grpc.protocol.AbstractUnaryGrpcService;
 import com.linecorp.armeria.spring.ArmeriaServerConfigurator;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import zipkin2.Callback;
+import zipkin2.Span;
 import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.collector.Collector;
 import zipkin2.collector.CollectorMetrics;
@@ -31,7 +34,6 @@ import zipkin2.storage.StorageComponent;
 /** Collector for receiving spans on a gRPC endpoint. */
 @ConditionalOnProperty(name = "zipkin.collector.grpc.enabled") // disabled by default
 final class ZipkinGrpcCollector {
-  static final byte[] EMPTY = new byte[0];
 
   @Bean ArmeriaServerConfigurator grpcCollectorConfigurator(StorageComponent storage,
     CollectorSampler sampler, CollectorMetrics metrics) {
@@ -46,7 +48,7 @@ final class ZipkinGrpcCollector {
       sb.service("/zipkin.proto3.SpanService/Report", new SpanService(collector, grpcMetrics));
   }
 
-  static final class SpanService extends AbstractUnaryGrpcService {
+  static final class SpanService extends AbstractUnsafeUnaryGrpcService {
 
     final Collector collector;
     final CollectorMetrics metrics;
@@ -56,24 +58,30 @@ final class ZipkinGrpcCollector {
       this.metrics = metrics;
     }
 
-    @Override protected CompletableFuture<byte[]> handleMessage(byte[] bytes) {
+    @Override protected CompletableFuture<ByteBuf> handleMessage(ByteBuf bytes) {
       metrics.incrementMessages();
-      metrics.incrementBytes(bytes.length);
+      metrics.incrementBytes(bytes.readableBytes());
 
-      if (bytes.length == 0) {
+      if (!bytes.isReadable()) {
         return CompletableFuture.completedFuture(bytes); // lenient on empty messages
       }
-      CompletableFutureCallback result = new CompletableFutureCallback();
-      collector.acceptSpans(bytes, SpanBytesDecoder.PROTO3, result);
-      return result;
+
+      try {
+        CompletableFutureCallback result = new CompletableFutureCallback();
+        List<Span> spans = SpanBytesDecoder.PROTO3.decodeList(bytes.nioBuffer());
+        collector.accept(spans, result);
+        return result;
+      } finally {
+        bytes.release();
+      }
     }
   }
 
-  static final class CompletableFutureCallback extends CompletableFuture<byte[]>
+  static final class CompletableFutureCallback extends CompletableFuture<ByteBuf>
     implements Callback<Void> {
 
     @Override public void onSuccess(Void value) {
-      complete(EMPTY);
+      complete(Unpooled.EMPTY_BUFFER);
     }
 
     @Override public void onError(Throwable t) {


Mime
View raw message