camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject [05/28] camel git commit: Fixed the producer which now should be handling the async and sync flows in an improved way.
Date Fri, 25 Nov 2016 10:00:44 GMT
Fixed the producer which now should be handling the async and sync flows in an improved way.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d0bd0f74
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d0bd0f74
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d0bd0f74

Branch: refs/heads/master
Commit: d0bd0f7449f63893b0f33a3444d91a4a8e0f1acb
Parents: 8757400
Author: gilfernandes <gil.fernandes@gmail.com>
Authored: Tue Nov 22 15:04:17 2016 +0000
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Fri Nov 25 10:05:36 2016 +0100

----------------------------------------------------------------------
 .../component/firebase/FirebaseEndpoint.java    | 14 ++++-----
 .../component/firebase/FirebaseProducer.java    | 31 +++++++++----------
 .../firebase/FirebaseProducerTest.java          | 32 +++++++++++++++-----
 3 files changed, 47 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d0bd0f74/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
index 709785e..41d7a61 100644
--- a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
+++ b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
@@ -52,10 +52,10 @@ public class FirebaseEndpoint extends DefaultEndpoint {
     @Metadata(required = "false")
     private String keyName = "firebaseKey";
 
-    @UriParam(defaultValue = "async", description = "If true, the save or update request
(set value in Firebase terms) "
+    @UriParam(defaultValue = "reply", description = "If true, the save or update request
(set value in Firebase terms) "
             + "is fired and the reply will be ignored, else the routing thread will wait
and the reply will be saved in the exchange message")
     @Metadata(required = "false")
-    private boolean async;
+    private boolean reply;
 
     public FirebaseEndpoint(String uri, FirebaseComponent firebaseComponent, FirebaseConfig
firebaseConfig) {
         super(uri, firebaseComponent);
@@ -64,7 +64,7 @@ public class FirebaseEndpoint extends DefaultEndpoint {
         this.setServiceAccountFile(firebaseConfig.getServiceAccountFile());
         this.databaseUrl = firebaseConfig.getDatabaseUrl();
         final String keyName = firebaseConfig.getKeyName();
-        this.setAsync(firebaseConfig.isAsync());
+        this.setReply(firebaseConfig.isAsync());
         if (keyName != null) {
             this.setKeyName(keyName);
         }
@@ -110,12 +110,12 @@ public class FirebaseEndpoint extends DefaultEndpoint {
         this.keyName = keyName;
     }
 
-    public boolean isAsync() {
-        return async;
+    public boolean isReply() {
+        return reply;
     }
 
-    public void setAsync(boolean async) {
-        this.async = async;
+    public void setReply(boolean reply) {
+        this.reply = reply;
     }
 
     public FirebaseApp getFirebaseApp() {

http://git-wip-us.apache.org/repos/asf/camel/blob/d0bd0f74/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
index ef8f8a6..33ba39b 100644
--- a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
+++ b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
@@ -22,7 +22,6 @@ import com.google.firebase.database.FirebaseDatabase;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.Processor;
 import org.apache.camel.component.firebase.exception.DatabaseErrorException;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.slf4j.Logger;
@@ -43,35 +42,35 @@ public class FirebaseProducer extends DefaultAsyncProducer {
         rootReference = endpoint.getRootReference();
     }
 
-    /**
-     * Processes the message exchange.
-     * Similar to {@link Processor#process}, but the caller supports having the exchange
asynchronously processed.
-     * <p/>
-     * If there was a failure processing then the caused {@link Exception} would be set on
the {@link Exchange}.
-     *
-     * @param exchange the message exchange
-     * @param callback the {@link AsyncCallback} will be invoked when the processing of the
exchange is completed.
-     *                 If the exchange is completed synchronously, then the callback is also
invoked synchronously.
-     *                 The callback should therefore be careful of starting recursive loop.
-     * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt>
to continue being executed asynchronously
-     */
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         final Message in = exchange.getIn();
+        final Message out = exchange.getOut();
         String firebaseKey = (String) in.getHeader(endpoint.getKeyName());
         Object value = in.getBody();
         DatabaseReference ref = FirebaseDatabase
                 .getInstance(endpoint.getFirebaseApp())
                 .getReference(rootReference).child(firebaseKey);
+        final boolean reply = endpoint.isReply();
+        out.setHeaders(in.getHeaders());
+        if (reply) { // Wait for reply
+            processReply(exchange, callback, value, ref);
+        } else { // Fire and forget
+            ref.setValue(value);
+            out.setBody(in.getBody());
+            callback.done(true);
+        }
+        return !reply;
+    }
+
+    private void processReply(Exchange exchange, AsyncCallback callback, Object value, DatabaseReference
ref) {
         ref.setValue(value, (DatabaseError databaseError, DatabaseReference databaseReference)
-> {
             if (databaseError != null) {
                 exchange.setException(new DatabaseErrorException(databaseError));
-                exchange.getOut().setFault(true);
             } else {
                 exchange.getOut().setBody(databaseReference);
             }
-            callback.done(endpoint.isAsync());
+            callback.done(false);
         });
-        return endpoint.isAsync();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d0bd0f74/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
index f0f2cc3..973b2ee 100644
--- a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
+++ b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
@@ -22,6 +22,9 @@ import java.nio.file.Paths;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static java.util.stream.IntStream.range;
+import static junit.framework.TestCase.fail;
+
 import com.google.firebase.database.DatabaseReference;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Message;
@@ -52,16 +55,28 @@ public class FirebaseProducerTest {
     }
 
     @Test
-    public void whenFirebaseSetShouldReceiveMessagesSync() throws Exception {
-        startRoute(false, DatabaseReference.class);
+    public void whenFirebaseSetShouldReceiveMessageAsDBReference() throws Exception {
+        startRoute(true, DatabaseReference.class);
     }
 
     @Test
-    public void whenFirebaseSetShouldReceiveMessagesAsync() throws Exception {
-        startRoute(true, String.class);
+    public void whenFirebaseSetShouldReceiveMessageAsDbString() throws Exception {
+        startRoute(false, String.class);
     }
 
-    private void startRoute(final boolean async, final Class<?> expectedBodyClass)
throws Exception {
+    @Test
+    public void whenMultipleFirebaseSetShouldReceiveExpectedMessages() {
+        range(0, 10).forEach(i -> {
+            try {
+                startRoute(true, DatabaseReference.class);
+                startRoute(false, String.class);
+            } catch (Exception e) {
+                fail("Multiple test fails: " + e);
+            }
+        });
+    }
+
+    private void startRoute(final boolean reply, final Class<?> expectedBodyClass)
throws Exception {
         sampleInputProvider.copySampleFile();
         CamelContext context = new DefaultCamelContext();
         context.addRoutes(new RouteBuilder() {
@@ -78,11 +93,14 @@ public class FirebaseProducerTest {
                             out.setHeader("firebaseKey", keyValue[0]);
                             out.setBody(keyValue[1].trim());
                         })
-                        .to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&async=%b",
-                                ConfigurationProvider.createDatabaseUrl(), rootReference,
serviceAccountFile, async))
+                        .to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&reply=%b",
+                                ConfigurationProvider.createDatabaseUrl(), rootReference,
serviceAccountFile, reply))
                         .to("log:whenFirebaseSet?level=WARN")
                         .process(exchange1 -> {
                             assertThat(exchange1.getIn().getBody().getClass()).isEqualTo(expectedBodyClass);
+                            if (reply) {
+                                assertThat(exchange1.getIn().getHeader("firebaseKey")).isNotNull();
+                            }
                             try {
                                 reentrantLock.lock();
                                 wake.signal();


Mime
View raw message