usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [08/19] git commit: Finished updating back to executor pool
Date Wed, 01 Oct 2014 20:01:07 GMT
Finished updating back to executor pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/10604788
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/10604788
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/10604788

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: 1060478811e928c45b15ff5b96cd4b9888671d77
Parents: af3726b
Author: Todd Nine <toddnine@apache.org>
Authored: Mon Sep 29 09:20:26 2014 -0600
Committer: Todd Nine <toddnine@apache.org>
Committed: Mon Sep 29 09:20:26 2014 -0600

----------------------------------------------------------------------
 .../impl/EntityVersionCleanupTask.java          | 81 ++++++++++----------
 .../impl/EntityVersionCleanupTaskTest.java      | 55 +++++++------
 .../core/task/NamedTaskExecutorImpl.java        | 20 +++--
 .../usergrid/persistence/core/task/Task.java    | 22 +-----
 .../core/task/NamedTaskExecutorImplTest.java    | 12 +--
 .../shard/impl/ShardGroupCompactionImpl.java    | 40 ++--------
 6 files changed, 96 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 3d1483d..d7ece40 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -2,10 +2,8 @@ package org.apache.usergrid.persistence.collection.impl;
 
 
 import java.util.List;
-import java.util.Stack;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RecursiveTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -20,12 +18,17 @@ import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIte
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
 
 /**
  * Cleans up previous versions from the specified version. Note that this means the version
passed in the io event is
  * retained, the range is exclusive.
  */
-public class EntityVersionCleanupTask extends Task<Void> {
+public class EntityVersionCleanupTask implements Task<Void> {
 
     private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class
);
 
@@ -66,21 +69,23 @@ public class EntityVersionCleanupTask extends Task<Void> {
 
 
     @Override
-    public void rejected() {
+    public Void rejected() {
         //Our task was rejected meaning our queue was full.  We need this operation to run,
         // so we'll run it in our current thread
 
         try {
-            executeTask();
+            call();
         }
         catch ( Exception e ) {
             throw new RuntimeException( "Exception thrown in call task", e );
         }
+
+        return null;
     }
 
 
     @Override
-    public Void executeTask() throws Exception {
+    public Void call() throws Exception {
 
 
         final UUID maxVersion = version;
@@ -117,54 +122,46 @@ public class EntityVersionCleanupTask extends Task<Void> {
 
     private void fireEvents() throws ExecutionException, InterruptedException {
 
-        if ( listeners.size() == 0 ) {
+        final int listenerSize = listeners.size();
+
+        if ( listenerSize == 0 ) {
             return;
         }
 
-        //stack to track forked tasks
-        final Stack<RecursiveTask<Void>> tasks = new Stack<>();
-
-
-        //we don't want to fork the final listener, we'll run that in our current thread
-        final int forkedTaskSize = listeners.size() - 1;
-
-
-        //execute all the listeners
-        for ( int i = 0; i < forkedTaskSize; i++ ) {
-
-            final EntityVersionDeleted listener = listeners.get( i );
-
-            final RecursiveTask<Void> task = createTask( listener );
-
-            task.fork();
-
-            tasks.push( task );
+        if ( listenerSize == 1 ) {
+            listeners.get( 0 ).versionDeleted( scope, entityId, version );
+            return;
         }
 
+        LOG.debug( "Started firing {} listeners", listenerSize );
 
-        final RecursiveTask<Void> lastTask = createTask( listeners.get( forkedTaskSize
) );
+        //if we have more than 1, run them on the rx scheduler for a max of 8 operations
at a time
+        Observable.from( listeners )
+                  .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>()
{
 
-        lastTask.invoke();
+                      @Override
+                      public Observable<EntityVersionDeleted> call(
+                              final Observable<EntityVersionDeleted> entityVersionDeletedObservable
) {
 
+                          return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>()
{
+                              @Override
+                              public void call( final EntityVersionDeleted listener ) {
+                                  listener.versionDeleted( scope, entityId, version );
+                              }
+                          } );
+                      }
+                  }, Schedulers.io() ).toBlocking().last();
 
-        //wait for them to complete
-        while ( !tasks.isEmpty() ) {
-            tasks.pop().get();
-        }
+        LOG.debug( "Finished firing {} listeners", listenerSize );
     }
 
 
-    /**
-     * Return the new task to execute
-     */
-    private RecursiveTask<Void> createTask( final EntityVersionDeleted listener ) {
-        return new RecursiveTask<Void>() {
-            @Override
-            protected Void compute() {
-                listener.versionDeleted( scope, entityId, version );
-                return null;
-            }
-        };
+    private static interface ListenerRunner {
+
+        /**
+         * Run the listeners
+         */
+        public void runListeners();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index 2df75f1..a34b4f8 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection.impl;
 
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -27,6 +28,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 
 import org.junit.AfterClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -40,6 +42,7 @@ import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
@@ -57,7 +60,7 @@ import static org.mockito.Mockito.when;
  */
 public class EntityVersionCleanupTaskTest {
 
-    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4
);
+    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4,
0 );
 
 
     @AfterClass
@@ -119,10 +122,10 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //verify it was run
         verify( firstBatch ).execute();
@@ -187,10 +190,10 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //verify it was run
         verify( firstBatch, never() ).execute();
@@ -260,10 +263,10 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //we deleted the version
         //verify it was run
@@ -343,10 +346,10 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //we deleted the version
         //verify we deleted everything
@@ -440,18 +443,18 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         /**
          * While we're not done, release latches every 200 ms
          */
-        while(!cleanupTask.isDone()) {
+        while(!future.isDone()) {
             Thread.sleep( 200 );
             waitSemaphore.release( listenerCount );
         }
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //we deleted the version
         //verify we deleted everything
@@ -479,7 +482,7 @@ public class EntityVersionCleanupTaskTest {
         /**
          * only 1 thread on purpose, we want to saturate the task
          */
-        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1);
+        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0);
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
@@ -496,17 +499,16 @@ public class EntityVersionCleanupTaskTest {
         final int sizeToReturn = 10;
 
 
-        final int listenerCount = 1;
+        final int listenerCount = 2;
 
         final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
         final Semaphore waitSemaphore = new Semaphore( 0 );
 
 
-        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+        final SlowListener slowListener = new SlowListener( latch, waitSemaphore );
+        final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch
);
 
-        final List<EntityVersionDeleted> listeners = new ArrayList<>();
 
-        listeners.add( listener1 );
 
         final Id applicationId = new SimpleId( "application" );
 
@@ -526,11 +528,16 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask firstTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version
);
+                        mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(slowListener),
appScope, entityId, version );
+
+
+
+        //change the listeners to one that is just invoked quickly
+
 
         EntityVersionCleanupTask secondTask =
                       new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                              mvccEntitySerializationStrategy, listeners, appScope, entityId,
version );
+                              mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(runListener),
appScope, entityId, version );
 
 
         final MutationBatch firstBatch = mock( MutationBatch.class );
@@ -548,24 +555,24 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( firstTask );
+        ListenableFuture<Void> future1 =  taskExecutor.submit( firstTask );
 
         //now start another task while the slow running task is running
-        taskExecutor.submit( secondTask );
+        ListenableFuture<Void> future2 =  taskExecutor.submit( secondTask );
 
         //get the second task, we shouldn't have been able to queue it, therefore it should
just run in process
-        secondTask.get();
+        future2.get();
 
         /**
          * While we're not done, release latches every 200 ms
          */
-        while(!firstTask.isDone()) {
+        while(!future1.isDone()) {
             Thread.sleep( 200 );
             waitSemaphore.release( listenerCount );
         }
 
         //wait for the task
-        firstTask.get();
+        future1.get();
 
         //we deleted the version
         //verify we deleted everything

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index d4cc915..b18687a 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,6 +1,7 @@
 package org.apache.usergrid.persistence.core.task;
 
 
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
@@ -61,7 +62,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
 
     @Override
-    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
+    public <V> ListenableFuture<V> submit( final Task<V> task ) {
 
         final ListenableFuture<V> future;
 
@@ -71,26 +72,31 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
             /**
              * Log our success or failures for debugging purposes
              */
-            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+            Futures.addCallback( future, new TaskFutureCallBack<V>( task ) );
         }
         catch ( RejectedExecutionException ree ) {
-            task.rejected();
-            return Futures.immediateCancelledFuture();
+            return Futures.immediateFuture( task.rejected());
         }
 
         return future;
     }
 
 
+    @Override
+    public void shutdown() {
+        this.executorService.shutdownNow();
+    }
+
+
     /**
      * Callback for when the task succeeds or fails.
      */
-    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V>
{
+    private static final class TaskFutureCallBack<V> implements FutureCallback<V>
{
 
-        private final Task<V, I> task;
+        private final Task<V> task;
 
 
-        private TaskFutureCallBack( Task<V, I> task ) {
+        private TaskFutureCallBack( Task<V> task ) {
             this.task = task;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 9a0b857..5890627 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,30 +1,14 @@
 package org.apache.usergrid.persistence.core.task;
 
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.RecursiveTask;
 
 
 /**
  * The task to execute
  */
-public interface Task<V, I> extends Callable<V> {
-
-    /**
-     * Get the unique identifier of this task.  This may be used to collapse runnables over
a time period in the future
-     */
-    public abstract I getId();
-
-
-    @Override
-    protected V compute() {
-        try {
-            return executeTask();
-        }
-        catch ( Exception e ) {
-            exceptionThrown( e );
-            throw new RuntimeException( e );
-        }
-    }
+public interface Task<V> extends Callable<V> {
 
 
     /**
@@ -40,7 +24,7 @@ public interface Task<V, I> extends Callable<V> {
      * request and process later (lazy repair for instance ) do so.
      *
      */
-    void rejected();
+    V rejected();
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index fa63eee..34f57e5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -29,7 +29,7 @@ public class NamedTaskExecutorImplTest {
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {};
+        final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {};
 
         executor.submit( task );
 
@@ -185,7 +185,7 @@ public class NamedTaskExecutorImplTest {
     }
 
 
-    private static abstract class TestTask<V> implements Task<V, UUID> {
+    private static abstract class TestTask<V> implements Task<V> {
 
         private final List<Throwable> exceptions;
         private final CountDownLatch exceptionLatch;
@@ -203,11 +203,6 @@ public class NamedTaskExecutorImplTest {
         }
 
 
-        @Override
-        public UUID getId() {
-            return UUIDGenerator.newTimeUUID();
-        }
-
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -217,8 +212,9 @@ public class NamedTaskExecutorImplTest {
 
 
         @Override
-        public void rejected() {
+        public V rejected() {
             rejectedLatch.countDown();
+            return null;
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index be7fbe4..6cd98a7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -315,7 +315,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
     }
 
 
-    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey>
{
+    private final class ShardAuditTask implements Task<AuditResult> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -329,23 +329,18 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
             this.group = group;
         }
 
-
-        @Override
-        public ShardAuditKey getId() {
-            return new ShardAuditKey( scope, edgeMeta, group );
-        }
-
-
-        @Override
+         @Override
         public void exceptionThrown( final Throwable throwable ) {
             LOG.error( "Unable to execute audit for shard of {}", throwable );
         }
 
 
         @Override
-        public void rejected() {
+        public AuditResult rejected() {
             //ignore, if this happens we don't care, we're saturated, we can check later
-            LOG.error( "Rejected audit for shard of {}", getId() );
+            LOG.error( "Rejected audit for shard of scope {} edge, meta {} and group {}",
scope, edgeMeta, group );
+
+            return AuditResult.NOT_CHECKED;
         }
 
 
@@ -418,29 +413,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
     }
 
 
-    private static final class ShardAuditKey {
-        private final ApplicationScope scope;
-        private final DirectedEdgeMeta edgeMeta;
-        private final ShardEntryGroup group;
-
-
-        private ShardAuditKey( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
-                               final ShardEntryGroup group ) {
-            this.scope = scope;
-            this.edgeMeta = edgeMeta;
-            this.group = group;
-        }
-
-
-        @Override
-        public String toString() {
-            return "ShardAuditKey{" +
-                    "scope=" + scope +
-                    ", edgeMeta=" + edgeMeta +
-                    ", group=" + group +
-                    '}';
-        }
-    }
 
 
     /**


Mime
View raw message