usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [22/39] usergrid git commit: index will merge all batches
Date Wed, 14 Oct 2015 16:54:18 GMT
index will merge all batches


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3ed08483
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3ed08483
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3ed08483

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 3ed0848352e2507ad1c3ba886c433f95657a5c24
Parents: 4c263b8
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Mon Sep 28 11:01:19 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Mon Sep 28 11:01:19 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 126 +++++++++++++------
 .../asyncevents/AsyncIndexProvider.java         |  12 +-
 .../asyncevents/InMemoryAsyncEventService.java  |  16 ++-
 .../index/AmazonAsyncEventServiceTest.java      |   5 +-
 .../index/impl/IndexOperationMessage.java       |   5 +
 5 files changed, 115 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index fcb93c9..e1c6886 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +71,7 @@ import com.google.inject.Singleton;
 import rx.Observable;
 import rx.Subscriber;
 import rx.Subscription;
+import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
 
@@ -85,6 +88,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private final QueueManager queue;
     private final QueueScope queueScope;
     private final IndexProcessorFig indexProcessorFig;
+    private final IndexProducer indexProducer;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
     private final EntityIndexFactory entityIndexFactory;
@@ -110,11 +114,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
     @Inject
-    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, final
IndexProcessorFig indexProcessorFig,
-                                    final MetricsFactory metricsFactory,  final EntityCollectionManagerFactory
entityCollectionManagerFactory,
-                                    final IndexLocationStrategyFactory indexLocationStrategyFactory,
final EntityIndexFactory entityIndexFactory,
+    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
+                                    final IndexProcessorFig indexProcessorFig,
+                                    final IndexProducer indexProducer,
+                                    final MetricsFactory metricsFactory,
+                                    final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                    final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                                    final EntityIndexFactory entityIndexFactory,
                                     final EventBuilder eventBuilder,
                                     final RxTaskScheduler rxTaskScheduler ) {
+        this.indexProducer = indexProducer;
 
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
@@ -219,43 +228,60 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
     private void handleMessages( final List<QueueMessage> messages ) {
-        if ( logger.isDebugEnabled() ) {
-            logger.debug( "handleMessages with {} message", messages.size() );
+        if (logger.isDebugEnabled()) {
+            logger.debug("handleMessages with {} message", messages.size());
         }
 
-        for ( QueueMessage message : messages ) {
-            final AsyncEvent event = ( AsyncEvent ) message.getBody();
+        Observable<IndexEventResult> merged = Observable.empty();
+        for (QueueMessage message : messages) {
+            final AsyncEvent event = (AsyncEvent) message.getBody();
 
-            logger.debug( "Processing {} event", event );
+            logger.debug("Processing {} event", event);
 
-            if ( event == null ) {
-                logger.error( "AsyncEvent type or event is null!" );
+            if (event == null) {
+                logger.error("AsyncEvent type or event is null!");
                 continue;
             }
 
 
-            if ( event instanceof EdgeDeleteEvent ) {
-                handleEdgeDelete( message );
-            }
-            else if ( event instanceof EdgeIndexEvent ) {
-                handleEdgeIndex( message );
+            if (event instanceof EdgeDeleteEvent) {
+               merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeDelete(queueMessage),
message));
+            } else if (event instanceof EdgeIndexEvent) {
+               merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeIndex(queueMessage),message));
+            } else if (event instanceof EntityDeleteEvent) {
+                merged = merged.mergeWith( callHandleIndex(queueMessage -> handleEntityDelete(queueMessage),message));
+            } else if (event instanceof EntityIndexEvent) {
+                merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEntityIndexUpdate(queueMessage),message));
+            } else if (event instanceof InitializeApplicationIndexEvent) {
+                //does not return observable
+                handleInitializeApplicationIndex(message);
+            } else {
+                logger.error("Unknown EventType: {}", event);
             }
 
-            else if ( event instanceof EntityDeleteEvent ) {
-                handleEntityDelete( message );
-            }
-            else if ( event instanceof EntityIndexEvent ) {
-                handleEntityIndexUpdate( message );
-            }
+            messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
+        }
 
-            else if ( event instanceof InitializeApplicationIndexEvent ) {
-                handleInitializeApplicationIndex( message );
-            }
-            else {
-                logger.error( "Unknown EventType: {}", event );
-            }
+        merged
+            .filter(indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage().isPresent())
+            .buffer(MAX_TAKE)
+            .flatMap(indexEventResults -> {
+                IndexOperationMessage combined = new IndexOperationMessage();
+                Observable.from(indexEventResults)
+                    .doOnNext(indexEventResult -> combined.injest(indexEventResult.getIndexOperationMessage().get())).subscribe();
+                indexProducer.put(combined).subscribe();
+                return Observable.from(indexEventResults);
+            })
+            .doOnNext(indexEventResult ->ack(indexEventResult.queueMessage));
+    }
 
-            messageCycle.update( System.currentTimeMillis() - event.getCreationTime() );
+    private Observable<IndexEventResult> callHandleIndex(Func1<QueueMessage,Observable<IndexOperationMessage>>
toCall, QueueMessage message){
+        try{
+            IndexOperationMessage indexOperationMessage =  toCall.call(message).toBlocking().lastOrDefault(null);
+            return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));
+        }catch (Exception e){
+            logger.error("failed to run index",e);
+            return Observable.just( new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),false));
         }
     }
 
@@ -276,7 +302,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    public void handleEntityIndexUpdate(final QueueMessage message) {
+    public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage
message) {
 
         Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate"
);
 
@@ -298,8 +324,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope,
entityId, updatedAfter);
 
         final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex(
entityIndexOperation );
-
-        subscribeAndAck( observable, message );
+        return observable;
     }
 
 
@@ -313,7 +338,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         offer( operation );
     }
 
-    public void handleEdgeIndex(final QueueMessage message) {
+    public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message)
{
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeIndex");
 
@@ -333,8 +358,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(
entity -> eventBuilder.buildNewEdge(
             applicationScope, entity, edge ) );
-
-        subscribeAndAck( edgeIndexObservable, message );
+        return edgeIndexObservable;
     }
 
     @Override
@@ -344,7 +368,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         offer( new EdgeDeleteEvent( applicationScope, edge ) );
     }
 
-    public void handleEdgeDelete(final QueueMessage message) {
+    public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message)
{
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeDelete");
 
@@ -362,8 +386,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}",
applicationScope, edge);
 
         final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge(
applicationScope, edge );
-
-        subscribeAndAck( observable, message );
+        return observable;
     }
 
 
@@ -378,7 +401,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         return queue.getQueueDepth();
     }
 
-    public void handleEntityDelete(final QueueMessage message) {
+    public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage
message) {
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
 
@@ -401,8 +424,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
             entityDeleteResults.getIndexObservable() );
-
-        subscribeAndAck( merged, message );
+        return merged;
     }
 
 
@@ -526,4 +548,28 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private void subscribeAndAck( final Observable<?> observable, final QueueMessage
message ){
        observable.doOnCompleted( ()-> ack(message)  ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler()
).subscribe();
     }
+    public static class IndexEventResult{
+        private final QueueMessage queueMessage;
+        private final Optional<IndexOperationMessage> indexOperationMessage;
+        private final boolean success;
+
+        public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage>
indexOperationMessage ,boolean success){
+
+            this.queueMessage = queueMessage;
+            this.indexOperationMessage = indexOperationMessage;
+            this.success = success;
+        }
+
+        public QueueMessage getQueueMessage() {
+            return queueMessage;
+        }
+
+        public boolean success() {
+            return success;
+        }
+
+        public Optional<IndexOperationMessage> getIndexOperationMessage() {
+            return indexOperationMessage;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 0e773cf..e9e36f0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
@@ -49,6 +50,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService>
{
     private final EventBuilder eventBuilder;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
     private final EntityIndexFactory entityIndexFactory;
+    private final IndexProducer indexProducer;
 
     private AsyncEventService asyncEventService;
 
@@ -61,7 +63,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService>
{
                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
                               final EventBuilder eventBuilder,
                               final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                              final EntityIndexFactory entityIndexFactory) {
+                              final EntityIndexFactory entityIndexFactory,
+                              final IndexProducer indexProducer) {
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
@@ -71,6 +74,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService>
{
         this.eventBuilder = eventBuilder;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
         this.entityIndexFactory = entityIndexFactory;
+        this.indexProducer = indexProducer;
     }
 
 
@@ -92,12 +96,12 @@ public class AsyncIndexProvider implements Provider<AsyncEventService>
{
 
         switch (impl) {
             case LOCAL:
-                return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
+                return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
             case SQS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig,
metricsFactory,
+                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig,
indexProducer, metricsFactory,
                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory,
eventBuilder, rxTaskScheduler );
             case SNS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig,
metricsFactory,
+                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig,
indexProducer, metricsFactory,
                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory,
eventBuilder, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues()
+ " are allowed");

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 6a71b3e..fad6e48 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,14 +50,19 @@ public class InMemoryAsyncEventService implements AsyncEventService {
 
     private final EventBuilder eventBuilder;
     private final RxTaskScheduler rxTaskScheduler;
+    private final IndexProducer indexProducer;
     private final boolean resolveSynchronously;
 
 
     @Inject
-    public InMemoryAsyncEventService( final EventBuilder eventBuilder, final RxTaskScheduler
rxTaskScheduler, boolean
-        resolveSynchronously ) {
+    public InMemoryAsyncEventService( final EventBuilder eventBuilder,
+                                      final RxTaskScheduler rxTaskScheduler,
+                                      final IndexProducer indexProducer,
+                                      boolean resolveSynchronously
+    ) {
         this.eventBuilder = eventBuilder;
         this.rxTaskScheduler = rxTaskScheduler;
+        this.indexProducer = indexProducer;
         this.resolveSynchronously = resolveSynchronously;
     }
 
@@ -117,12 +124,13 @@ public class InMemoryAsyncEventService implements AsyncEventService
{
     }
 
     public void run( Observable<?> observable ) {
+        Observable mapped = observable.map(message -> message instanceof IndexOperationMessage
? indexProducer.put((IndexOperationMessage)message) : Observable.just(message));
         //start it in the background on an i/o thread
         if ( !resolveSynchronously ) {
-            observable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+            mapped.subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).subscribe();
         }
         else {
-            observable.toBlocking().lastOrDefault(null);
+            mapped.subscribe();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 4660389..a14437c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.index;
 
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.junit.Rule;
 import org.junit.runner.RunWith;
 
@@ -72,6 +73,8 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
     @Inject
     public EventBuilder eventBuilder;
 
+    @Inject
+    public IndexProducer indexProducer;
 
     @Inject
     public IndexLocationStrategyFactory indexLocationStrategyFactory;
@@ -82,7 +85,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory,
 entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,
rxTaskScheduler );
+        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer,
metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory,
eventBuilder, rxTaskScheduler );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ed08483/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index bd2bec8..0676314 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Optional;
 
 
 /**
@@ -106,4 +107,8 @@ public class IndexOperationMessage implements Serializable {
     public long getCreationTime() {
         return creationTime;
     }
+
+    public void injest(IndexOperationMessage singleMessage) {
+        si
+    }
 }


Mime
View raw message