usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject usergrid git commit: refactor observable
Date Tue, 06 Oct 2015 14:27:08 GMT
Repository: usergrid
Updated Branches:
  refs/heads/review-observable 890611de9 -> 17586ecb5


refactor observable


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/17586ecb
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/17586ecb
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/17586ecb

Branch: refs/heads/review-observable
Commit: 17586ecb5dd38ee71b7edfd256fdecb50e615556
Parents: 890611d
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Tue Oct 6 08:26:46 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Tue Oct 6 08:26:46 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/17586ecb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 37a9da5..e16de05 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -227,7 +227,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    private Observable<IndexEventResult> handleMessages( final List<QueueMessage>
messages ) {
+    private Observable<QueueMessage> handleMessages( final List<QueueMessage>
messages ) {
         if (logger.isDebugEnabled()) {
             logger.debug("handleMessages with {} message", messages.size());
         }
@@ -290,7 +290,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
                 //ack after successful completion of the operation.
                 return indexProducer.put(combined)
-                    .flatMap(operationResult -> Observable.from(indexEventResults));
+                    .flatMap(operationResult -> Observable.from(indexEventResults))
+                    .map(result -> result.getQueueMessage());
 
             });
 
@@ -448,7 +449,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final Observable<IndexOperationMessage> merged = entityDeleteResults
             .getEntitiesCompacted()
-            .collect(() -> new ArrayList<>(),(list,item)-> list.add(item))
+            .collect(() -> new ArrayList<>(), (list, item) -> list.add(item))
             .flatMap(collected -> entityDeleteResults.getIndexObservable()) ;
         return merged;
     }
@@ -499,7 +500,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private void startWorker() {
         synchronized (mutex) {
 
-            Observable<IndexEventResult> consumer =
+            Observable<QueueMessage> consumer =
                     Observable.create(new Observable.OnSubscribe<List<QueueMessage>>()
{
                         @Override
                         public void call(final Subscriber<? super List<QueueMessage>>
subscriber) {
@@ -540,14 +541,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
                         }
                     })
                             //this won't block our read loop, just reads and proceeds
-                            .map(messages ->
+                            .flatMap(messages ->
                                     handleMessages(messages)
-                                        .map(indexEventResult -> {
-                                            ack(indexEventResult.getQueueMessage());
-                                            return indexEventResult;
+                                        .doOnNext(message -> {
+                                            //ack each message, but only if we didn't error.
+                                            ack(message);
                                         })
-                                        .toBlocking().lastOrDefault(null)
-                            )//ack each message, but only if we didn't error.  If we did,
we'll want to log it and
+                            )
                             .subscribeOn(Schedulers.newThread());
 
             //start in the background


Mime
View raw message