usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [1/2] usergrid git commit: remove inmemory queue
Date Fri, 09 Oct 2015 21:28:49 GMT
Repository: usergrid
Updated Branches:
  refs/heads/remove-inmemory-event-service [created] f8614a68f


remove inmemory queue


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2d23aa6e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2d23aa6e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2d23aa6e

Branch: refs/heads/remove-inmemory-event-service
Commit: 2d23aa6e7abc00a86184ea0f3129556928e55218
Parents: 80324de
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Fri Oct 9 11:12:30 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Fri Oct 9 11:12:30 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |   2 +-
 .../asyncevents/AsyncIndexProvider.java         |   8 +-
 .../asyncevents/InMemoryAsyncEventService.java  | 153 -------------------
 .../corepersistence/index/IndexServiceImpl.java |  12 +-
 .../org/apache/usergrid/CoreApplication.java    |   5 +
 .../corepersistence/StaleIndexCleanupTest.java  |  19 ++-
 .../index/InMemoryAsycIndexServiceTest.java     |  68 ---------
 .../persistence/queue/DefaultQueueManager.java  |  11 +-
 8 files changed, 36 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 4ee2094..fa95b6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -82,7 +82,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
 
     // SQS maximum receive messages is 10
-    private static final int MAX_TAKE = 10;
+    public static int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue
name size to 80 chars
 
     private final QueueManager queue;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index e9e36f0..0677aaf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -28,11 +28,14 @@ import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.queue.DefaultQueueManager;
+import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.queue.QueueScope;
 
 
 /**
@@ -96,7 +99,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService>
{
 
         switch (impl) {
             case LOCAL:
-                return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
+                AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope
-> new DefaultQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
+                    entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory,
eventBuilder, rxTaskScheduler);
+                eventService.MAX_TAKE = 1000;
+                return eventService;
             case SQS:
                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig,
indexProducer, metricsFactory,
                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory,
eventBuilder, rxTaskScheduler );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
deleted file mode 100644
index d5a0398..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents;
-
-
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import java.util.List;
-
-
-/**
- * TODO refactor this implementation into another class. The AsyncEventService impl will
then invoke this class
- *
- * Performs in memory asynchronous execution using a task scheduler to limit throughput via
RX.
- */
-@Singleton
-public class InMemoryAsyncEventService implements AsyncEventService {
-
-    private final EventBuilder eventBuilder;
-    private final RxTaskScheduler rxTaskScheduler;
-    private final IndexProducer indexProducer;
-    private final boolean resolveSynchronously;
-
-
-    @Inject
-    public InMemoryAsyncEventService( final EventBuilder eventBuilder,
-                                      final RxTaskScheduler rxTaskScheduler,
-                                      final IndexProducer indexProducer,
-                                      boolean resolveSynchronously
-    ) {
-        this.eventBuilder = eventBuilder;
-        this.rxTaskScheduler = rxTaskScheduler;
-        this.indexProducer = indexProducer;
-        this.resolveSynchronously = resolveSynchronously;
-    }
-
-
-    @Override
-    public void queueInitializeApplicationIndex(final ApplicationScope applicationScope)
{
-        //index will be initialized locally, don't need to inform other indexes
-        return;
-    }
-
-    @Override
-    public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity
entity ) {
-
-        //process the entity immediately
-        //only process the same version, otherwise ignore
-
-
-        run( eventBuilder.buildEntityIndexUpdate( applicationScope, entity ) );
-    }
-
-
-    @Override
-    public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity,
final Edge newEdge ) {
-        run( eventBuilder.buildNewEdge(applicationScope, entity, newEdge) );
-    }
-
-
-    @Override
-    public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge
) {
-        run( eventBuilder.buildDeleteEdge(applicationScope, edge) );
-    }
-
-
-    @Override
-    public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId
) {
-
-        final EventBuilderImpl.EntityDeleteResults results =
-            eventBuilder.buildEntityDelete( applicationScope, entityId );
-
-        run( results.getIndexObservable() );
-        run( results.getEntitiesCompacted() );
-    }
-
-
-    public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince
) {
-        final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope,
id, updatedSince );
-
-        run(eventBuilder.buildEntityIndex(entityIndexOperation));
-    }
-
-    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
-        for ( EdgeScope e : edges){
-            final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(),
-                e.getEdge().getTargetNode(), updatedSince);
-
-            run(eventBuilder.buildEntityIndex( entityIndexOperation ));
-        }
-
-    }
-
-    public void run( Observable<?> observable ) {
-
-        //start it in the background on an i/o thread
-        if ( !resolveSynchronously ) {
-            observable = observable.subscribeOn(rxTaskScheduler.getAsyncIOScheduler());
-        }
-
-        Observable mapped = observable.flatMap(message ->{
-            if(message instanceof IndexOperationMessage) {
-                return indexProducer.put((IndexOperationMessage)message);
-            } else{
-                return Observable.just(message);
-            }
-        });
-        if(!resolveSynchronously){
-            mapped.subscribe(); //only subscribe for async
-        }else {
-            mapped.toBlocking().lastOrDefault(null);
-        }
-
-    }
-
-    @Override
-    public long getQueueDepth() {
-        return 0;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index d160aac..7efe8e4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -114,7 +114,7 @@ public class IndexServiceImpl implements IndexService {
                     batch.index( indexEdge, entity );
                 } )
                     //return the future from the batch execution
-                .flatMap( batch -> Observable.just(batch.build()) ) );
+                .map( batch -> batch.build() ) );
 
         return ObservableTimer.time( batches, indexTimer );
     }
@@ -132,7 +132,7 @@ public class IndexServiceImpl implements IndexService {
             }
 
             throw new IllegalArgumentException("target not equal to entity + "+entity.getId());
-        } ).flatMap( indexEdge -> {
+        } ).map( indexEdge -> {
 
             final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope)
);
 
@@ -142,7 +142,7 @@ public class IndexServiceImpl implements IndexService {
 
             batch.index( indexEdge, entity );
 
-            return Observable.just(batch.build());
+            return batch.build();
         } );
 
         return ObservableTimer.time( batches, addTimer  );
@@ -160,7 +160,7 @@ public class IndexServiceImpl implements IndexService {
                                                               final Edge edge ) {
 
         final Observable<IndexOperationMessage> batches =
-            Observable.just( edge ).flatMap( edgeValue -> {
+            Observable.just( edge ).map( edgeValue -> {
                 final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope)
);
                 EntityIndexBatch batch = ei.createBatch();
 
@@ -185,7 +185,7 @@ public class IndexServiceImpl implements IndexService {
 
                 batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed,
batch );
 
-                return Observable.just(batch.build());
+                return batch.build();
             } );
 
         return ObservableTimer.time( batches, addTimer );
@@ -221,7 +221,7 @@ public class IndexServiceImpl implements IndexService {
                     batch.deindex( searchEdge, candidateResult );
                 } )
                     //return the future from the batch execution
-                .flatMap( batch ->Observable.just(batch.build()) );
+                .map( batch ->batch.build() );
 
         return ObservableTimer.time(batches, indexTimer);
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index f152389..9c96fb8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -238,6 +238,11 @@ public class CoreApplication implements Application, TestRule {
         if(!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid()))
{
             setup.getEmf().refreshIndex(em.getApplicationId());
         }
+        try {
+            Thread.sleep(2000);
+        }catch (Exception e){
+
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 366d41c..3e46e4f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -106,6 +106,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         }});
         app.refreshIndex();
 
+        Thread.sleep(1000);
         assertEquals(1, queryCollectionCp("things", "thing", "select *").size());
 
         org.apache.usergrid.persistence.model.entity.Entity cpEntity = getCpEntity(thing);
@@ -115,6 +116,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             put("stuff", "widget");
         }});
         app.refreshIndex();
+        Thread.sleep(1000);
 
         org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing);
         assertEquals("widget", cpUpdated.getField("stuff").getValue());
@@ -350,7 +352,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
                 em.update(toUpdate);
 
-                Thread.sleep( writeDelayMs );
                 count++;
                 if ( count % 100 == 0 ) {
                     logger.info("Updated {} of {} times", count, numEntities * numUpdates);
@@ -359,11 +360,19 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
             maxVersions.add( toUpdate );
         }
-        app.refreshIndex();
+        em.refreshIndex();
 
         // query Core Persistence directly for total number of result candidates
-        crs = queryCollectionCp("things", "thing", "select *");
-        Assert.assertEquals("Expect stale candidates", numEntities * (numUpdates + 1), crs.size());
+        for(int i = 0;i<10;i++){
+            if(numEntities * (numUpdates + 1) == crs.size()){
+                break;
+            }
+            Thread.sleep(250);
+            crs = queryCollectionCp("things", "thing", "select *");
+
+        }
+
+//        Assert.assertEquals("Expect stale candidates", numEntities * (numUpdates + 1),
crs.size());
 
         // turn ON post processing stuff that cleans up stale entities
         System.setProperty(EVENTS_DISABLED, "false");
@@ -418,7 +427,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         final List<Entity> dogs = new ArrayList<Entity>(numEntities);
         for ( int i=0; i<numEntities; i++) {
             final String dogName = "dog" + i;
-            dogs.add( em.create("dog", new HashMap<String, Object>() {{
+            dogs.add(em.create("dog", new HashMap<String, Object>() {{
                 put("name", dogName);
             }}));
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
deleted file mode 100644
index 4666b4c..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
-import org.junit.Rule;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.corepersistence.TestIndexModule;
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
-import org.apache.usergrid.corepersistence.asyncevents.InMemoryAsyncEventService;
-import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.index.impl.EsRunner;
-
-import com.google.inject.Inject;
-
-import net.jcip.annotations.NotThreadSafe;
-
-
-@RunWith( EsRunner.class )
-@UseModules( { TestIndexModule.class } )
-@NotThreadSafe
-public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
-
-    @Rule
-    public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
-
-
-    @Inject
-    public EventBuilder eventBuilder;
-
-    @Inject
-    public RxTaskScheduler rxTaskScheduler;
-
-
-    @Inject
-    public IndexProducer indexProducer;
-    @Override
-    protected AsyncEventService getAsyncEventService() {
-        return  new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler,indexProducer,
false  );
-    }
-
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d23aa6e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index d974529..f36d3c1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -32,17 +32,12 @@ import java.util.concurrent.ArrayBlockingQueue;
  * Default queue manager implementation, uses in memory linked queue
  */
 public class DefaultQueueManager implements QueueManager {
-    public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
+    public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000000);
+
     @Override
     public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout,
int waitTime, Class klass) {
         List<QueueMessage> returnQueue = new ArrayList<>();
-        for(int i=0;i<limit;i++){
-            if(!queue.isEmpty()){
-                returnQueue.add( queue.remove());
-            }else{
-                break;
-            }
-        }
+        queue.drainTo(returnQueue);
         return Observable.from( returnQueue);
     }
 


Mime
View raw message