usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [4/9] usergrid git commit: fix subscribe for messages
Date Tue, 06 Oct 2015 20:35:02 GMT
fix subscribe for messages


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1a1d42e1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1a1d42e1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1a1d42e1

Branch: refs/heads/2.1-release
Commit: 1a1d42e1f53cabf433442c17f614f9fcae418a22
Parents: b437f61
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Mon Oct 5 16:28:04 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Mon Oct 5 16:28:04 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1a1d42e1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e215d48..bf29c5a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -285,13 +285,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
                 //ack after successful completion of the operation.
-                return indexProducer.put( combined )
-                    .flatMap( operationResult -> Observable.from( indexEventResults )
)
-                    //ack each message, but only if we didn't error.  If we did, we'll want
to log it and
-                    .map( indexEventResult -> {
-                                        ack( indexEventResult.queueMessage );
-                                        return indexEventResult;
-                                    } );
+                return indexProducer.put(combined)
+                    .flatMap(operationResult -> Observable.from(indexEventResults));
+
             } );
 
     }
@@ -538,7 +534,15 @@ public class AmazonAsyncEventService implements AsyncEventService {
                         }
                     })
                             //this won't block our read loop, just reads and proceeds
-                            .flatMap( messages -> handleMessages( messages ) ).subscribeOn(
Schedulers.newThread() );
+                            .map(messages ->
+                                    handleMessages(messages)
+                                        .map(indexEventResult -> {
+                                            ack( indexEventResult.getQueueMessage() );
+                                            return indexEventResult;
+                                        })
+                                        .toBlocking().lastOrDefault(null)
+                            )//ack each message, but only if we didn't error.  If we did,
we'll want to log it and
+                            .subscribeOn( Schedulers.newThread() );
 
             //start in the background
 


Mime
View raw message