usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [25/39] usergrid git commit: fix observables
Date Wed, 14 Oct 2015 16:54:21 GMT
fix observables


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a9dc6ba2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a9dc6ba2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a9dc6ba2

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: a9dc6ba22e028dcae00cb03b210f12f1a598583b
Parents: 0428cd6
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Mon Sep 28 14:51:29 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Mon Sep 28 14:51:29 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/InMemoryAsyncEventService.java    | 16 +++++++++++-----
 .../corepersistence/index/IndexServiceTest.java   | 18 +++++++++++++-----
 .../index/impl/IndexRefreshCommandImpl.java       |  2 +-
 .../rest/exceptions/AbstractExceptionMapper.java  |  2 +-
 4 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index fad6e48..b29c39e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -124,14 +124,20 @@ public class InMemoryAsyncEventService implements AsyncEventService
{
     }
 
     public void run( Observable<?> observable ) {
-        Observable mapped = observable.map(message -> message instanceof IndexOperationMessage
? indexProducer.put((IndexOperationMessage)message) : Observable.just(message));
+
         //start it in the background on an i/o thread
         if ( !resolveSynchronously ) {
-            mapped.subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).subscribe();
-        }
-        else {
-            mapped.subscribe();
+            observable = observable.subscribeOn(rxTaskScheduler.getAsyncIOScheduler());
         }
+
+        Observable mapped = observable.flatMap(message ->{
+            if(message instanceof IndexOperationMessage) {
+                return indexProducer.put((IndexOperationMessage)message);
+            } else{
+                return Observable.just(message);
+            }
+        });
+        mapped.subscribe();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index 50c5501..6001dd4 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -80,6 +81,9 @@ public class IndexServiceTest {
     public GraphManagerFactory graphManagerFactory;
 
     @Inject
+    public IndexProducer indexProducer;
+
+    @Inject
     public EntityCollectionManagerFactory entityCollectionManagerFactory;
 
     @Inject
@@ -121,6 +125,7 @@ public class IndexServiceTest {
 
         //real users should never call to blocking, we're not sure what we'll get
         final IndexOperationMessage results = indexed.toBlocking().last();
+        indexProducer.put(results).subscribe();
 
         final Set<IndexOperation> indexRequests = results.getIndexRequests();
 
@@ -170,7 +175,8 @@ public class IndexServiceTest {
 
 
         //now index
-        final int batches = indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+        final int batches = indexService.indexEntity( applicationScope, testEntity )
+            .flatMap(mesage -> indexProducer.put(mesage)).count().toBlocking().last();
 
 
         assertEquals(1, batches);
@@ -255,7 +261,8 @@ public class IndexServiceTest {
 
 
         //now index
-        final int batches = indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+        final int batches = indexService.indexEntity( applicationScope, testEntity )
+            .flatMap(mesage -> indexProducer.put(mesage)).count().toBlocking().last();
 
         //take our edge count + 1 and divided by batch sizes
         final int expectedSize = ( int ) Math.ceil( ( (double)edgeCount + 1 ) / indexFig.getIndexBatchSize()
);
@@ -372,7 +379,7 @@ public class IndexServiceTest {
 
         final List<Edge> connectionSearchEdges = createConnectionSearchEdges( testEntity,
graphManager, edgeCount );
 
-        indexService.indexEntity( applicationScope, testEntity ).toBlocking().getIterator();
+        indexService.indexEntity( applicationScope, testEntity ).flatMap(mesage -> indexProducer.put(mesage)).toBlocking().getIterator();
 
         //query until results are available for collections
         final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
collectionEdge );
@@ -396,7 +403,7 @@ public class IndexServiceTest {
 
             //step 2
             IndexOperationMessage indexOperationMessage =
-                indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ).toBlocking().lastOrDefault(
null );
+                indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ) .flatMap(mesage
->indexProducer.put(mesage)).toBlocking().lastOrDefault( null );
 
             //not sure if this is still valid.
             assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
@@ -436,7 +443,8 @@ public class IndexServiceTest {
         final Edge connectionSearch = graphManager.writeEdge( connectionEdge ).toBlocking().last();
 
         //now index
-        indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+        indexService.indexEntity( applicationScope, testEntity)
+            .flatMap(mesage ->indexProducer.put(mesage)).count().toBlocking().last();
 
         //query until results are available for collections
         final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
collectionEdge );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 01942a8..087eefe 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -193,7 +193,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
                 //delete the item
                 IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
                 indexOperationMessage.addDeIndexRequest( deIndexRequest );
-                producer.put( indexOperationMessage );
+                producer.put( indexOperationMessage ).subscribe();
             } );
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a9dc6ba2/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
index 1dbffbd..a359618 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/AbstractExceptionMapper.java
@@ -76,7 +76,7 @@ public abstract class AbstractExceptionMapper<E extends java.lang.Throwable>
imp
                 logger.debug(e.getClass().getCanonicalName() + " Server Error (" + status
+ ")", e);
             }
             switch (status){
-                case 200 : logger.info("Uncaught Exception", e); break;
+                case 200 : logger.debug("Uncaught Exception", e); break;
                 default: logger.error("Uncaught Exception", e);
             }
         }


Mime
View raw message