usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [27/39] usergrid git commit: rewrite observable
Date Wed, 14 Oct 2015 16:54:23 GMT
rewrite observable


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d73f98db
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d73f98db
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d73f98db

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: d73f98dbfaec511c74c7801ff046755f29da3a69
Parents: 0c4c1ab
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Mon Sep 28 15:59:48 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Mon Sep 28 15:59:48 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 81 ++++++++++++--------
 1 file changed, 50 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d73f98db/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 5f681e7..50b210e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -232,56 +232,75 @@ public class AmazonAsyncEventService implements AsyncEventService {
             logger.debug("handleMessages with {} message", messages.size());
         }
 
-        Observable<IndexEventResult> merged = Observable.empty();
-        for (QueueMessage message : messages) {
+        Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message
-> {
             final AsyncEvent event = (AsyncEvent) message.getBody();
 
             logger.debug("Processing {} event", event);
 
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
-                continue;
+                return Observable.empty();
             }
+            try {
+                //merge each operation to a master observable;
+                if (event instanceof EdgeDeleteEvent) {
+                    return handleIndexOperation(message, queueMessage -> handleEdgeDelete(queueMessage));
+                } else if (event instanceof EdgeIndexEvent) {
+                    return handleIndexOperation(message, queueMessage -> handleEdgeIndex(queueMessage));
+                } else if (event instanceof EntityDeleteEvent) {
+                    return handleIndexOperation(message, queueMessage -> handleEntityDelete(queueMessage));
+                } else if (event instanceof EntityIndexEvent) {
+                    return handleIndexOperation(message, queueMessage -> handleEntityIndexUpdate(queueMessage));
+                } else if (event instanceof InitializeApplicationIndexEvent) {
+                    //does not return observable
+                    handleInitializeApplicationIndex(message);
+                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),
true));
+                } else {
+                    logger.error("Unknown EventType: {}", event);
+                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),
false));
+                }
+            }catch (Exception e){
+                logger.error("Failed to index entity", e,message);
+                return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),
false));
+            }finally {
+                messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
 
-            if (event instanceof EdgeDeleteEvent) {
-               merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeDelete(queueMessage)));
-            } else if (event instanceof EdgeIndexEvent) {
-               merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEdgeIndex(queueMessage)));
-            } else if (event instanceof EntityDeleteEvent) {
-                merged = merged.mergeWith( callHandleIndex(message, queueMessage -> handleEntityDelete(queueMessage)));
-            } else if (event instanceof EntityIndexEvent) {
-                merged = merged.mergeWith(callHandleIndex(message, queueMessage -> handleEntityIndexUpdate(queueMessage)));
-            } else if (event instanceof InitializeApplicationIndexEvent) {
-                //does not return observable
-                handleInitializeApplicationIndex(message);
-            } else {
-                logger.error("Unknown EventType: {}", event);
             }
+        });
 
-            messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
-        }
-
-        merged
+        masterObservable
+            //remove unsuccessful
             .filter(indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage().isPresent())
+            //take the max
             .buffer(MAX_TAKE)
-            .flatMap(indexEventResults -> {
+            //map them to index results and return them
+            .map(indexEventResults -> {
                 IndexOperationMessage combined = new IndexOperationMessage();
-                Observable.from(indexEventResults)
-                    .doOnNext(indexEventResult -> combined.ingest(indexEventResult.getIndexOperationMessage().get())).subscribe();
-                indexProducer.put(combined).subscribe();
-                return Observable.from(indexEventResults);
+                indexEventResults.stream()
+                    .forEach(indexEventResult -> combined.ingest(indexEventResult.getIndexOperationMessage().get()));
+                indexProducer.put(combined).subscribe();//execute the index operation
+                return indexEventResults;
+            })
+                //flat map the ops so they are back to individual
+            .flatMap(indexEventResults -> Observable.from(indexEventResults))
+            //ack each message
+            .map(indexEventResult -> {
+                ack(indexEventResult.queueMessage);
+                return indexEventResult;
             })
-            .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage))
             .subscribe();
     }
 
-    private Observable<IndexEventResult> callHandleIndex(QueueMessage message, Func1<QueueMessage,
Observable<IndexOperationMessage>> toCall){
+    //transform index operation to
+    private Observable<IndexEventResult> handleIndexOperation(QueueMessage queueMessage,
+                                                              Func1<QueueMessage, Observable<IndexOperationMessage>>
operation
+    ){
         try{
-            IndexOperationMessage indexOperationMessage =  toCall.call(message).toBlocking().lastOrDefault(null);
-            return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));
+            return operation.call(queueMessage)
+                .map(indexOperationMessage -> new IndexEventResult(queueMessage, Optional.fromNullable(indexOperationMessage),
true));
         }catch (Exception e){
             logger.error("failed to run index",e);
-            return Observable.just( new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),false));
+            return Observable.just( new IndexEventResult(queueMessage, Optional.<IndexOperationMessage>absent(),false));
         }
     }
 
@@ -548,7 +567,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private void subscribeAndAck( final Observable<?> observable, final QueueMessage
message ){
        observable.doOnCompleted( ()-> ack(message)  ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler()
).subscribe();
     }
-    public static class IndexEventResult{
+    public class IndexEventResult{
         private final QueueMessage queueMessage;
         private final Optional<IndexOperationMessage> indexOperationMessage;
         private final boolean success;


Mime
View raw message