Fixed the producer which now should be handling the async and sync flows in an improved way.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d0bd0f74
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d0bd0f74
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d0bd0f74
Branch: refs/heads/master
Commit: d0bd0f7449f63893b0f33a3444d91a4a8e0f1acb
Parents: 8757400
Author: gilfernandes <gil.fernandes@gmail.com>
Authored: Tue Nov 22 15:04:17 2016 +0000
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Fri Nov 25 10:05:36 2016 +0100
----------------------------------------------------------------------
.../component/firebase/FirebaseEndpoint.java | 14 ++++-----
.../component/firebase/FirebaseProducer.java | 31 +++++++++----------
.../firebase/FirebaseProducerTest.java | 32 +++++++++++++++-----
3 files changed, 47 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d0bd0f74/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
index 709785e..41d7a61 100644
--- a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
+++ b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
@@ -52,10 +52,10 @@ public class FirebaseEndpoint extends DefaultEndpoint {
@Metadata(required = "false")
private String keyName = "firebaseKey";
- @UriParam(defaultValue = "async", description = "If true, the save or update request
(set value in Firebase terms) "
+ @UriParam(defaultValue = "reply", description = "If true, the save or update request
(set value in Firebase terms) "
+ "is fired and the reply will be ignored, else the routing thread will wait
and the reply will be saved in the exchange message")
@Metadata(required = "false")
- private boolean async;
+ private boolean reply;
public FirebaseEndpoint(String uri, FirebaseComponent firebaseComponent, FirebaseConfig
firebaseConfig) {
super(uri, firebaseComponent);
@@ -64,7 +64,7 @@ public class FirebaseEndpoint extends DefaultEndpoint {
this.setServiceAccountFile(firebaseConfig.getServiceAccountFile());
this.databaseUrl = firebaseConfig.getDatabaseUrl();
final String keyName = firebaseConfig.getKeyName();
- this.setAsync(firebaseConfig.isAsync());
+ this.setReply(firebaseConfig.isAsync());
if (keyName != null) {
this.setKeyName(keyName);
}
@@ -110,12 +110,12 @@ public class FirebaseEndpoint extends DefaultEndpoint {
this.keyName = keyName;
}
- public boolean isAsync() {
- return async;
+ public boolean isReply() {
+ return reply;
}
- public void setAsync(boolean async) {
- this.async = async;
+ public void setReply(boolean reply) {
+ this.reply = reply;
}
public FirebaseApp getFirebaseApp() {
http://git-wip-us.apache.org/repos/asf/camel/blob/d0bd0f74/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
index ef8f8a6..33ba39b 100644
--- a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
+++ b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
@@ -22,7 +22,6 @@ import com.google.firebase.database.FirebaseDatabase;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.Processor;
import org.apache.camel.component.firebase.exception.DatabaseErrorException;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.slf4j.Logger;
@@ -43,35 +42,35 @@ public class FirebaseProducer extends DefaultAsyncProducer {
rootReference = endpoint.getRootReference();
}
- /**
- * Processes the message exchange.
- * Similar to {@link Processor#process}, but the caller supports having the exchange
asynchronously processed.
- * <p/>
- * If there was a failure processing then the caused {@link Exception} would be set on
the {@link Exchange}.
- *
- * @param exchange the message exchange
- * @param callback the {@link AsyncCallback} will be invoked when the processing of the
exchange is completed.
- * If the exchange is completed synchronously, then the callback is also
invoked synchronously.
- * The callback should therefore be careful of starting recursive loop.
- * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt>
to continue being executed asynchronously
- */
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
final Message in = exchange.getIn();
+ final Message out = exchange.getOut();
String firebaseKey = (String) in.getHeader(endpoint.getKeyName());
Object value = in.getBody();
DatabaseReference ref = FirebaseDatabase
.getInstance(endpoint.getFirebaseApp())
.getReference(rootReference).child(firebaseKey);
+ final boolean reply = endpoint.isReply();
+ out.setHeaders(in.getHeaders());
+ if (reply) { // Wait for reply
+ processReply(exchange, callback, value, ref);
+ } else { // Fire and forget
+ ref.setValue(value);
+ out.setBody(in.getBody());
+ callback.done(true);
+ }
+ return !reply;
+ }
+
+ private void processReply(Exchange exchange, AsyncCallback callback, Object value, DatabaseReference
ref) {
ref.setValue(value, (DatabaseError databaseError, DatabaseReference databaseReference)
-> {
if (databaseError != null) {
exchange.setException(new DatabaseErrorException(databaseError));
- exchange.getOut().setFault(true);
} else {
exchange.getOut().setBody(databaseReference);
}
- callback.done(endpoint.isAsync());
+ callback.done(false);
});
- return endpoint.isAsync();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d0bd0f74/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
index f0f2cc3..973b2ee 100644
--- a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
+++ b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
@@ -22,6 +22,9 @@ import java.nio.file.Paths;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import static java.util.stream.IntStream.range;
+import static junit.framework.TestCase.fail;
+
import com.google.firebase.database.DatabaseReference;
import org.apache.camel.CamelContext;
import org.apache.camel.Message;
@@ -52,16 +55,28 @@ public class FirebaseProducerTest {
}
@Test
- public void whenFirebaseSetShouldReceiveMessagesSync() throws Exception {
- startRoute(false, DatabaseReference.class);
+ public void whenFirebaseSetShouldReceiveMessageAsDBReference() throws Exception {
+ startRoute(true, DatabaseReference.class);
}
@Test
- public void whenFirebaseSetShouldReceiveMessagesAsync() throws Exception {
- startRoute(true, String.class);
+ public void whenFirebaseSetShouldReceiveMessageAsDbString() throws Exception {
+ startRoute(false, String.class);
}
- private void startRoute(final boolean async, final Class<?> expectedBodyClass)
throws Exception {
+ @Test
+ public void whenMultipleFirebaseSetShouldReceiveExpectedMessages() {
+ range(0, 10).forEach(i -> {
+ try {
+ startRoute(true, DatabaseReference.class);
+ startRoute(false, String.class);
+ } catch (Exception e) {
+ fail("Multiple test fails: " + e);
+ }
+ });
+ }
+
+ private void startRoute(final boolean reply, final Class<?> expectedBodyClass)
throws Exception {
sampleInputProvider.copySampleFile();
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
@@ -78,11 +93,14 @@ public class FirebaseProducerTest {
out.setHeader("firebaseKey", keyValue[0]);
out.setBody(keyValue[1].trim());
})
- .to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&async=%b",
- ConfigurationProvider.createDatabaseUrl(), rootReference,
serviceAccountFile, async))
+ .to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&reply=%b",
+ ConfigurationProvider.createDatabaseUrl(), rootReference,
serviceAccountFile, reply))
.to("log:whenFirebaseSet?level=WARN")
.process(exchange1 -> {
assertThat(exchange1.getIn().getBody().getClass()).isEqualTo(expectedBodyClass);
+ if (reply) {
+ assertThat(exchange1.getIn().getHeader("firebaseKey")).isNotNull();
+ }
try {
reentrantLock.lock();
wake.signal();
|