usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [2/4] usergrid git commit: Change read repair to interact with c* directly and only fire and index operation message to get the ES document removed from all regions.
Date Tue, 27 Oct 2015 20:41:05 GMT
Change read repair to interact with c* directly and only fire and index operation message to
get the ES document removed from all regions.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1b43bda3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1b43bda3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1b43bda3

Branch: refs/heads/2.1-release
Commit: 1b43bda3f801172b0e59f927ff3ae52a559d36cc
Parents: 70d7a95
Author: Michael Russo <michaelarusso@gmail.com>
Authored: Tue Oct 27 10:50:16 2015 -0700
Committer: Michael Russo <michaelarusso@gmail.com>
Committed: Tue Oct 27 12:56:13 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventService.java          |  5 ++
 .../asyncevents/InMemoryAsyncEventService.java  |  5 ++
 .../read/traverse/AbstractReadGraphFilter.java  | 69 +++++++++++++++++---
 .../traverse/ReadGraphCollectionFilter.java     | 10 ++-
 .../traverse/ReadGraphConnectionFilter.java     | 10 ++-
 .../impl/stage/NodeDeleteListenerImpl.java      | 27 ++++----
 6 files changed, 97 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index dcfffcb..dbf8996 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -77,6 +77,11 @@ public interface AsyncEventService extends ReIndexAction {
      */
     void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId);
 
+    /**
+     *
+     * @param indexOperationMessage
+     */
+    void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
 
     /**
      * current queue depth

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index fc6385c..d8334b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -105,6 +105,11 @@ public class InMemoryAsyncEventService implements AsyncEventService {
         run( results.getCompactedNode() );
     }
 
+    @Override
+    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage){
+        //this is not used locally
+    }
+
 
     public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince
) {
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope,
id, updatedSince );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 9d050c8..89230d7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,7 +20,11 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +46,7 @@ import com.google.common.base.Optional;
 import rx.Observable;
 
 
+
 /**
  * Command for reading graph edges
  */
@@ -51,15 +56,21 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
 
     private final GraphManagerFactory graphManagerFactory;
     private final RxTaskScheduler rxTaskScheduler;
+    private final EventBuilder eventBuilder;
+    private final AsyncEventService asyncEventService;
 
 
     /**
      * Create a new instance of our command
      */
     public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
-                                    final RxTaskScheduler rxTaskScheduler) {
+                                    final RxTaskScheduler rxTaskScheduler,
+                                    final EventBuilder eventBuilder,
+                                    final AsyncEventService asyncEventService ) {
         this.graphManagerFactory = graphManagerFactory;
         this.rxTaskScheduler = rxTaskScheduler;
+        this.eventBuilder = eventBuilder;
+        this.asyncEventService = asyncEventService;
     }
 
 
@@ -107,28 +118,56 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
                 final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();
 
 
+                if (isDeleted) {
 
-                if(isDeleted){
                     logger.trace("Edge {} is deleted, deleting the edge", markedEdge);
-                    graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                    final Observable<IndexOperationMessage> indexMessageObservable
= eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+
+                    indexMessageObservable
+                        .compose(applyCollector())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
+
                 }
 
-                if(isSourceNodeDeleted){
-                    final Id sourceNodeId = markedEdge.getSourceNode();
+                if (isSourceNodeDeleted) {
 
+                    final Id sourceNodeId = markedEdge.getSourceNode();
                     logger.trace("Edge {} has a deleted source node, deleting the entity
for id {}", markedEdge, sourceNodeId);
-                    graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+
+                    final EventBuilderImpl.EntityDeleteResults
+                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope,
sourceNodeId);
+
+                    entityDeleteResults.getIndexObservable()
+                        .compose(applyCollector())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
+
+                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+                        entityDeleteResults.getCompactedNode())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+                        subscribe();
+
                 }
 
-                if(isTargetNodeDelete){
+                if (isTargetNodeDelete) {
 
                     final Id targetNodeId = markedEdge.getTargetNode();
+                    logger.trace("Edge {} has a deleted target node, deleting the entity
for id {}", markedEdge, targetNodeId);
 
-                    logger.trace("Edge {} has a deleted target node, deleting the entity
for id {}", markedEdge, targetNodeId );
-                    graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                    final EventBuilderImpl.EntityDeleteResults
+                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope,
targetNodeId);
+
+                    entityDeleteResults.getIndexObservable()
+                        .compose(applyCollector())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
+
+                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+                        entityDeleteResults.getCompactedNode())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+                        subscribe();
+
                 }
 
 
@@ -202,4 +241,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
             return cursorEdge;
         }
     }
+
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector()
{
+
+        return observable -> observable
+            .filter((IndexOperationMessage msg) -> !msg.isEmpty())
+            .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+            .doOnNext(indexOperation -> {
+                asyncEventService.queueIndexOperationMessage(indexOperation);
+            });
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index 1d63bc6..3d7df3b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
@@ -41,8 +43,12 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter
{
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final
RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) {
-        super( graphManagerFactory, rxTaskScheduler );
+    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory,
+                                      final RxTaskScheduler rxTaskScheduler,
+                                      final EventBuilder eventBuilder,
+                                      final AsyncEventService asyncEventService,
+                                      @Assisted final String collectionName ) {
+        super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
         this.collectionName = collectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index efe94db..b2d368b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
@@ -41,8 +43,12 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter
{
      * Create a new instance of our command
      */
     @Inject
-    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final
RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) {
-        super( graphManagerFactory, rxTaskScheduler );
+    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,
+                                      final RxTaskScheduler rxTaskScheduler,
+                                      final EventBuilder eventBuilder,
+                                      final AsyncEventService asyncEventService,
+                                      @Assisted final String connectionName ) {
+        super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
         this.connectionName = connectionName;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index e4eb5fc..343cc77 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -157,26 +157,26 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
 
         //get all edges pointing to the target node and buffer then into groups for deletion
         Observable<MarkedEdge> targetEdges =
-                getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null, null
) )
-                        .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create(
new ObservableIterator<MarkedEdge>( "getTargetEdges" ) {
+                getEdgesTypesToTarget(scope, new SimpleSearchEdgeType(node, null, null))
+                        .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getTargetEdges")
{
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
-                                return storageSerialization.getEdgesToTarget( scope,
-                                        new SimpleSearchByEdgeType( node, edgeType, maxVersion,
SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
+                                return storageSerialization.getEdgesToTarget(scope,
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
                             }
-                        } ) );
+                        }));
 
 
         //get all edges pointing to the source node and buffer them into groups for deletion
         Observable<MarkedEdge> sourceEdges =
-                getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null, null
) )
-                        .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create(
new ObservableIterator<MarkedEdge>( "getSourceEdges" ) {
+                getEdgesTypesFromSource(scope, new SimpleSearchEdgeType(node, null, null))
+                        .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getSourceEdges")
{
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
-                                return storageSerialization.getEdgesFromSource( scope,
-                                        new SimpleSearchByEdgeType( node, edgeType, maxVersion,
SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
+                                return storageSerialization.getEdgesFromSource(scope,
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
                             }
-                        } ) );
+                        }));
 
         //merge both source and target into 1 observable.  We'll need to check them all regardless
of order
         return Observable.merge( targetEdges, sourceEdges )
@@ -235,12 +235,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
 
                     //run both the source/target edge type cleanup, then proceed
                     return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault(
null )
-                                     .flatMap( new Func1<Integer, Observable<MarkedEdge>>()
{
-                                         @Override
-                                         public Observable<MarkedEdge> call( final
Integer integer ) {
-                                             return Observable.from( markedEdges );
-                                         }
-                                     } );
+                                     .flatMap(integer -> Observable.from( markedEdges
));
                 } );
     }
 


Mime
View raw message