camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nferr...@apache.org
Subject [5/7] camel git commit: CAMEL-11237: Changes based on @nicolaferraro code review comments
Date Thu, 25 May 2017 12:05:31 GMT
CAMEL-11237: Changes based on @nicolaferraro code review comments

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/90bb2136
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/90bb2136
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/90bb2136

Branch: refs/heads/master
Commit: 90bb213612c24ebf29b8cc8891cb283d7aa0225a
Parents: 3025f91
Author: Dmitry Volodin <dmvolod@gmail.com>
Authored: Tue May 23 18:45:00 2017 +0300
Committer: Dmitry Volodin <dmvolod@gmail.com>
Committed: Tue May 23 18:45:00 2017 +0300

----------------------------------------------------------------------
 .../src/main/docs/grpc-component.adoc           |  6 ++--
 .../camel/component/grpc/GrpcConfiguration.java | 35 ++++++++++++++++++++
 .../camel/component/grpc/GrpcConsumer.java      | 18 ++++++----
 .../GrpcRequestAggregationStreamObserver.java   | 35 ++++++++++++++------
 .../GrpcRequestPropagationStreamObserver.java   | 34 ++++++++++++++++---
 .../grpc/GrpcConsumerPropagationTest.java       | 35 +++++++++++++-------
 6 files changed, 127 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/docs/grpc-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc b/components/camel-grpc/src/main/docs/grpc-component.adoc
index d12bf85..a74c6df 100644
--- a/components/camel-grpc/src/main/docs/grpc-component.adoc
+++ b/components/camel-grpc/src/main/docs/grpc-component.adoc
@@ -47,7 +47,7 @@ with the following path and query parameters:
 | **service** | *Required* Fully qualified service name from the protocol buffer descriptor
file (package dot service definition name) |  | String
 |=======================================================================
 
-#### Query Parameters (10 parameters):
+#### Query Parameters (12 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -55,6 +55,8 @@ with the following path and query parameters:
 | **host** (common) | The gRPC server host name |  | String
 | **port** (common) | The gRPC server port |  | int
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing
Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming
messages or the likes will now be processed as a message and handled by the routing Error
Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal
with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
+| **forwardOnCompleted** (consumer) | Determines if onCompleted events should be pushed to
the Camel route. | false | boolean
+| **forwardOnError** (consumer) | Determines if onError events should be pushed to the Camel
route. Exceptions will be set as message body. | false | boolean
 | **processingStrategy** (consumer) | This option specifies the top-level strategy for processing
service requests and responses in streaming mode. If an aggregation strategy is selected all
requests will be accumulated in the list then transferred to the flow and the accumulated
responses will be sent to the sender. If a propagation strategy is selected request is sent
to the stream and the response will be immediately sent back to the sender. |  | GrpcProcessing
Strategies
 | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice
if the option bridgeErrorHandler is enabled then this options is not in use. By default the
consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored.
|  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an
exchange. |  | ExchangePattern
@@ -92,7 +94,7 @@ The table below shows the types of objects in the message body, depending
on the
 |Header name |Description|Possible values
 
 |*CamelGrpcMethodName*|Method name handled by the consumer service|
-|*CamelGrpcEventType*|Received event type from the sended request|onNext, onCompleted or
onError
+|*CamelGrpcEventType*|Received event type from the sent request|onNext, onCompleted or onError
 |*CamelGrpcUserAgent*|If provided, the given agent will prepend the gRPC library's user agent
information|
 
 |=======================================================================

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
index c298c02..123de61 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
@@ -27,18 +27,30 @@ public class GrpcConfiguration {
     @UriPath
     @Metadata(required = "true")
     private String service;
+    
     @UriParam(label = "producer")
     private String method;
+    
     @UriParam
     private String host;
+    
     @UriParam
     private int port;
+    
     @UriParam(label = "producer")
     private String target;
+    
     @UriParam(label = "producer", defaultValue = "true")
     private Boolean usePlainText = true;
+    
     @UriParam(label = "consumer")
     private GrpcProcessingStrategies processingStrategy = GrpcProcessingStrategies.PROPAGATION;
+    
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean forwardOnCompleted;
+
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean forwardOnError;
 
     private String serviceName;
     private String servicePackage;
@@ -127,6 +139,29 @@ public class GrpcConfiguration {
     }
 
     /**
+     * Determines if onCompleted events should be pushed to the Camel route.
+     */
+    public void setForwardOnCompleted(boolean forwardOnCompleted) {
+        this.forwardOnCompleted = forwardOnCompleted;
+    }
+
+    public boolean isForwardOnCompleted() {
+        return forwardOnCompleted;
+    }
+
+    /**
+     * Determines if onError events should be pushed to the Camel route.
+     * Exceptions will be set as message body.
+     */
+    public void setForwardOnError(boolean forwardOnError) {
+        this.forwardOnError = forwardOnError;
+    }
+
+    public boolean isForwardOnError() {
+        return forwardOnError;
+    }
+
+    /**
      * The service name extracted from the full service name
      */
     protected String getServiceName() {

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
index 3bcdec0..27a7d4a 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
@@ -111,17 +111,21 @@ public class GrpcConsumer extends DefaultConsumer {
     }
     
     public void onCompleted(Exchange exchange) {
-        exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
-        doSend(exchange, done -> {
-        });
+        if (configuration.isForwardOnCompleted()) {
+            exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
+            doSend(exchange, done -> {
+            });
+        }
     }
 
     public void onError(Exchange exchange, Throwable error) {
-        exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR);
-        exchange.getIn().setBody(error);
+        if (configuration.isForwardOnError()) {
+            exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR);
+            exchange.getIn().setBody(error);
         
-        doSend(exchange, done -> {
-        });
+            doSend(exchange, done -> {
+            });
+        }
     }
         
     private boolean doSend(Exchange exchange, AsyncCallback callback) {

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
index 145029e..9f79b22 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.grpc.server;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import io.grpc.stub.StreamObserver;
 import org.apache.camel.component.grpc.GrpcConsumer;
@@ -47,23 +48,37 @@ public class GrpcRequestAggregationStreamObserver extends GrpcRequestAbstractStr
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public void onCompleted() {
+        CountDownLatch latch = new CountDownLatch(1);
+        Object responseBody = null;
+        
         exchange.getIn().setBody(requestList);
         exchange.getIn().setHeaders(headers);
 
         consumer.process(exchange, doneSync -> {
+            latch.countDown();
         });
+        
+        try {
+            latch.await();
+            
+            if (exchange.hasOut()) {
+                responseBody = exchange.getOut().getBody();
+            } else {
+                responseBody = exchange.getIn().getBody();
+            }
 
-        Object responseBody = exchange.getIn().getBody();
-        if (responseBody instanceof List) {
-            List<Object> responseList = (List<Object>)responseBody;
-            responseList.forEach((responseItem) -> {
-                responseObserver.onNext(responseItem);
-            });
-        } else {
-            responseObserver.onNext(responseBody);
+            if (responseBody instanceof List) {
+                List<?> responseList = (List<?>)responseBody;
+                responseList.forEach((responseItem) -> {
+                    responseObserver.onNext(responseItem);
+                });
+            } else {
+                responseObserver.onNext(responseBody);
+            }
+            responseObserver.onCompleted();
+        } catch (InterruptedException e) {
+            responseObserver.onError(e);
         }
-        responseObserver.onCompleted();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
index ae51100..632ff2d 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.grpc.server;
 
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import io.grpc.stub.StreamObserver;
 import org.apache.camel.component.grpc.GrpcConsumer;
@@ -34,17 +36,39 @@ public class GrpcRequestPropagationStreamObserver extends GrpcRequestAbstractStr
 
     @Override
     public void onNext(Object request) {
+        CountDownLatch latch = new CountDownLatch(1);
+        Object responseBody = null;
+        
         exchange = endpoint.createExchange();
         exchange.getIn().setBody(request);
         exchange.getIn().setHeaders(headers);
+        
         consumer.process(exchange, doneSync -> {
+            latch.countDown();
         });
-        if (exchange.hasOut()) {
-            responseObserver.onNext(exchange.getOut().getBody());
-        } else {
-            responseObserver.onNext(exchange.getIn().getBody());
+        
+        try {
+            latch.await();
+            
+            if (exchange.hasOut()) {
+                responseBody = exchange.getOut().getBody();
+            } else {
+                responseBody = exchange.getIn().getBody();
+            }
+            
+            if (responseBody instanceof List) {
+                List<?> responseList = (List<?>)responseBody;
+                responseList.forEach((responseItem) -> {
+                    responseObserver.onNext(responseItem);
+                });
+            } else {
+                responseObserver.onNext(responseBody);
+            }
+            responseObserver.onCompleted();
+
+        } catch (InterruptedException e) {
+            responseObserver.onError(e);
         }
-        responseObserver.onCompleted();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
index d4a0641..e7cb8c7 100644
--- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
+++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
@@ -35,23 +35,28 @@ import org.slf4j.LoggerFactory;
 public class GrpcConsumerPropagationTest extends CamelTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerPropagationTest.class);
 
-    private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_ASYNC_NEXT_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
     private static final int GRPC_TEST_PING_ID = 1;
     private static final String GRPC_TEST_PING_VALUE = "PING";
     private static final String GRPC_TEST_PONG_VALUE = "PONG";
 
-    private ManagedChannel asyncRequestChannel;
-    private PingPongGrpc.PingPongStub asyncNonBlockingStub;
+    private ManagedChannel asyncOnNextChannel;
+    private ManagedChannel asyncOnCompletedChannel;
+    private PingPongGrpc.PingPongStub asyncOnNextStub;
+    private PingPongGrpc.PingPongStub asyncOnCompletedStub;
 
     @Before
     public void startGrpcChannels() {
-        asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build();
-        asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
+        asyncOnNextChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_NEXT_REQUEST_TEST_PORT).usePlaintext(true).build();
+        asyncOnCompletedChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT).usePlaintext(true).build();
+        asyncOnNextStub = PingPongGrpc.newStub(asyncOnNextChannel);
+        asyncOnCompletedStub = PingPongGrpc.newStub(asyncOnCompletedChannel);
     }
 
     @After
     public void stopGrpcChannels() throws Exception {
-        asyncRequestChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+        asyncOnNextChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
     }
 
     @Test
@@ -62,14 +67,15 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport {
         PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
         PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
 
-        StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncSync(responseObserver);
+        StreamObserver<PingRequest> requestObserver = asyncOnNextStub.pingAsyncSync(responseObserver);
         requestObserver.onNext(pingRequest);
         latch.await(5, TimeUnit.SECONDS);
 
-        MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation");
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:async-on-next-propagation");
         mockEndpoint.expectedMessageCount(1);
         mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT);
         mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER,
"pingAsyncSync");
+        mockEndpoint.assertIsSatisfied();
         
         PongResponse pongResponse = responseObserver.getPongResponse();
         assertNotNull(pongResponse);
@@ -85,14 +91,15 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport {
         PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
         PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
 
-        StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+        StreamObserver<PingRequest> requestObserver = asyncOnCompletedStub.pingAsyncAsync(responseObserver);
         requestObserver.onCompleted();
         latch.await(5, TimeUnit.SECONDS);
 
-        MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation");
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:async-on-completed-propagation");
         mockEndpoint.expectedMessageCount(1);
         mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
         mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER,
"pingAsyncAsync");
+        mockEndpoint.assertIsSatisfied();
     }
 
     @Override
@@ -101,8 +108,12 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport {
             @Override
             public void configure() {
                 
-                from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port="
+ GRPC_ASYNC_REQUEST_TEST_PORT)
-                    .to("mock:async-propagation")
+                from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port="
+ GRPC_ASYNC_NEXT_REQUEST_TEST_PORT)
+                    .to("mock:async-on-next-propagation")
+                    .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
+                
+                from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&forwardOnCompleted=true&host=localhost&port="
+ GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT)
+                    .to("mock:async-on-completed-propagation")
                     .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
             }
         };


Mime
View raw message