usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [07/19] git commit: Revert "Converted tasks to ForkJoin tasks to allow for parallel execution."
Date Wed, 01 Oct 2014 20:01:06 GMT
Revert "Converted tasks to ForkJoin tasks to allow for parallel execution."

This reverts commit dc3f448c946219c44e3dbfadb294fea0b8048343.

The ForkJoinPool does not have the following characteristics

Ability to name a pool
Ability to set queue size for rejection

As a result, it is not well suited for our asynchronous I/O tasks, and I'm moving back to
ExecutorService

Conflicts:
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
	stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
	stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
	stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/af3726b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/af3726b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/af3726b8

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: af3726b894051fde9ce63c8db1db0996aa4f2992
Parents: d4f80e7
Author: Todd Nine <toddnine@apache.org>
Authored: Sat Sep 27 17:58:24 2014 -0600
Committer: Todd Nine <toddnine@apache.org>
Committed: Sat Sep 27 17:58:24 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   2 +-
 .../persistence/core/task/ImmediateTask.java    |  36 ------
 .../core/task/NamedTaskExecutorImpl.java        | 126 +++++++++++++------
 .../usergrid/persistence/core/task/Task.java    |  53 +++-----
 .../persistence/core/task/TaskExecutor.java     |   3 +-
 .../core/task/NamedTaskExecutorImplTest.java    | 116 ++++-------------
 .../persistence/graph/guice/GraphModule.java    |   2 +-
 .../impl/shard/ShardGroupCompaction.java        |   7 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  51 ++++++--
 9 files changed, 180 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index a6230e1..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -101,7 +101,7 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize()
);
+        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(),
serializationFig.getTaskPoolQueueSize() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
deleted file mode 100644
index f6c28a3..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.usergrid.persistence.core.task;
-
-
-/**
- * Does not perform computation, just returns the value passed to it
- *
- */
-public class ImmediateTask<V> extends Task<V> {
-
-
-    private final V returned;
-
-
-    protected ImmediateTask( final V returned ) {
-        this.returned = returned;
-    }
-
-
-
-    @Override
-    public V executeTask() throws Exception {
-        return returned;
-    }
-
-
-    @Override
-    public void exceptionThrown( final Throwable throwable ) {
-          //no op
-    }
-
-
-    @Override
-    public void rejected() {
-        //no op
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index bb9cac8..d4cc915 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.usergrid.persistence.core.task;
 
 
@@ -24,6 +6,7 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -35,6 +18,10 @@ import org.slf4j.Logger;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 
 /**
@@ -44,64 +31,131 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class
);
 
-    private final NamedForkJoinPool executorService;
+    private final ListeningExecutorService executorService;
 
     private final String name;
     private final int poolSize;
+    private final int queueLength;
 
 
     /**
      * @param name The name of this instance of the task executor
      * @param poolSize The size of the pool.  This is the number of concurrent tasks that
can execute at once.
+     * @param queueLength The length of tasks to keep in the queue
      */
-    public NamedTaskExecutorImpl( final String name, final int poolSize ) {
-
-        //TODO, figure out how to name the fork/join threads in the pool
+    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength
) {
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more"
);
 
         this.name = name;
         this.poolSize = poolSize;
+        this.queueLength = queueLength;
+
+        final BlockingQueue<Runnable> queue =
+                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength
) : new SynchronousQueue<Runnable>();
 
-        this.executorService = new NamedForkJoinPool( poolSize );
+        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue
) );
     }
 
 
     @Override
-    public <V> Task<V> submit( final Task<V> task ) {
+    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
+
+        final ListenableFuture<V> future;
 
         try {
-            executorService.submit( task );
+            future = executorService.submit( task );
+
+            /**
+             * Log our success or failures for debugging purposes
+             */
+            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
+            return Futures.immediateCancelledFuture();
         }
 
-        return task;
+        return future;
     }
 
 
-    @Override
-    public void shutdown() {
-        executorService.shutdownNow();
+    /**
+     * Callback for when the task succeeds or fails.
+     */
+    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V>
{
+
+        private final Task<V, I> task;
+
+
+        private TaskFutureCallBack( Task<V, I> task ) {
+            this.task = task;
+        }
+
+
+        @Override
+        public void onSuccess( @Nullable final V result ) {
+            LOG.debug( "Successfully completed task ", task );
+        }
+
+
+        @Override
+        public void onFailure( final Throwable t ) {
+            LOG.error( "Unable to execute task.  Exception is ", t );
+
+            task.exceptionThrown( t );
+        }
     }
 
 
-    private final class NamedForkJoinPool extends ForkJoinPool {
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
 
-        private NamedForkJoinPool( final int workerThreadCount ) {
-            //TODO, verify the scheduler at the end
-            super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(),
true );
+            super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
+                    new RejectedHandler() );
         }
     }
 
 
-    private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler {
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+
 
         @Override
-        public void uncaughtException( final Thread t, final Throwable e ) {
-            LOG.error( "Uncaught exception on thread {} was {}", t, e );
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            Thread t = new Thread( r, name + "-" + newValue );
+
+            t.setDaemon( true );
+
+            return t;
         }
     }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private final class RejectedHandler implements RejectedExecutionHandler {
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor
) {
+            LOG.warn( "{} task queue full, rejecting task {}", name, r );
+
+            throw new RejectedExecutionException( "Unable to run task, queue full" );
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 5d35ce4..9a0b857 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,35 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.usergrid.persistence.core.task;
 
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.RecursiveTask;
 
 
 /**
  * The task to execute
  */
-public abstract class Task<V> extends RecursiveTask<V> {
+public interface Task<V, I> extends Callable<V> {
+
+    /**
+     * Get the unique identifier of this task.  This may be used to collapse runnables over
a time period in the future
+     */
+    public abstract I getId();
+
 
     @Override
     protected V compute() {
@@ -43,22 +27,21 @@ public abstract class Task<V> extends RecursiveTask<V> {
     }
 
 
-
-    /**
-     * Execute the task
-     */
-    public abstract V executeTask() throws Exception;
-
     /**
      * Invoked when this task throws an uncaught exception.
+     * @param throwable
      */
-    public abstract void exceptionThrown( final Throwable throwable );
+    void exceptionThrown(final Throwable throwable);
 
     /**
-     * Invoked when we weren't able to run this task by the the thread attempting to schedule
the task. If this task
-     * MUST be run immediately, you can invoke the call method from within this event to
invoke the task in the
-     * scheduling thread.  Note that this has performance implications to the user.  If you
can drop the request and
-     * process later (lazy repair for instance ) do so.
+     * Invoked when we weren't able to run this task by the the thread attempting to schedule
the task.
+     * If this task MUST be run immediately, you can invoke the call method from within this
event to invoke the
+     * task in the scheduling thread.  Note that this has performance implications to the
user.  If you can drop the
+     * request and process later (lazy repair for instance ) do so.
+     *
      */
-    public abstract void rejected();
+    void rejected();
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index c6f5915..aaa9d60 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,10 +13,11 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V> Task<V > submit( Task<V> task );
+    public <V> ListenableFuture<V> submit( Task<V> task );
 
     /**
      * Stop the task executor without waiting for scheduled threads to run
      */
     public void shutdown();
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 9656b5f..fa63eee 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -23,13 +23,13 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void jobSuccess() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {};
+        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {};
 
         executor.submit( task );
 
@@ -44,7 +44,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void exceptionThrown() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -53,10 +53,9 @@ public class NamedTaskExecutorImplTest {
         final RuntimeException re = new RuntimeException( "throwing exception" );
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {
-
-
             @Override
-            public Void executeTask() {
+            public Void call() throws Exception {
+                super.call();
                 throw re;
             }
         };
@@ -76,7 +75,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void noCapacity() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -85,8 +84,8 @@ public class NamedTaskExecutorImplTest {
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {
             @Override
-            public Void executeTask() throws Exception {
-                super.executeTask();
+            public Void call() throws Exception {
+                super.call();
 
                 //park this thread so it takes up a task and the next is rejected
                 final Object mutex = new Object();
@@ -130,22 +129,22 @@ public class NamedTaskExecutorImplTest {
     public void noCapacityWithQueue() throws InterruptedException {
 
         final int threadPoolSize = 1;
-       
+        final int queueSize = 10;
 
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize
);
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize,
queueSize );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        int iterations = threadPoolSize ;
+        int iterations = threadPoolSize + queueSize;
 
-        for ( int i = 0; i < iterations; i++ ) {
+        for(int i = 0; i < iterations; i ++) {
 
             final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {
                 @Override
-                public Void executeTask() throws Exception {
-                    super.executeTask();
+                public Void call() throws Exception {
+                    super.call();
 
                     //park this thread so it takes up a task and the next is rejected
                     final Object mutex = new Object();
@@ -161,6 +160,7 @@ public class NamedTaskExecutorImplTest {
         }
 
 
+
         runLatch.await( 1000, TimeUnit.MILLISECONDS );
 
         //now submit the second task
@@ -185,81 +185,12 @@ public class NamedTaskExecutorImplTest {
     }
 
 
-    @Test
-    public void jobTreeResult() throws InterruptedException {
-
-        final int threadPoolSize = 4;
-       
-
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize
);
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-
-        //accomodates for splitting the job 1->2->4 and joining
-        final CountDownLatch runLatch = new CountDownLatch( 7 );
-
-
-        TestRecursiveTask task = new TestRecursiveTask( exceptionLatch, rejectedLatch, runLatch,
1, 3 );
+    private static abstract class TestTask<V> implements Task<V, UUID> {
 
-         executor.submit( task );
-
-
-        //compute our result
-        Integer result = task.join();
-
-        //result should be 1+2*2+3*4
-        final int expected = 4*3;
-
-        assertEquals(expected, result.intValue());
-
-        //just to check our latches
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //now submit the second task
-
-
-    }
-
-
-    private static class TestRecursiveTask extends TestTask<Integer> {
-
-        private final int depth;
-        private final int maxDepth;
-
-        private TestRecursiveTask( final CountDownLatch exceptionLatch, final CountDownLatch
rejectedLatch,
-                                   final CountDownLatch runLatch, final int depth, final
int maxDepth ) {
-            super( exceptionLatch, rejectedLatch, runLatch );
-            this.depth = depth;
-            this.maxDepth = maxDepth;
-        }
-
-
-        @Override
-        public Integer executeTask() throws Exception {
-
-            if(depth == maxDepth ){
-                return depth;
-            }
-
-            TestRecursiveTask left = new TestRecursiveTask(exceptionLatch, rejectedLatch,
runLatch, depth+1, maxDepth  );
-
-            TestRecursiveTask right = new TestRecursiveTask(exceptionLatch, rejectedLatch,
runLatch, depth+1, maxDepth  );
-
-            //run our left in another thread
-            left.fork();
-
-            return right.compute() + left.join();
-        }
-    }
-
-
-    private static abstract class TestTask<V> extends Task<V> {
-
-        protected final List<Throwable> exceptions;
-        protected final CountDownLatch exceptionLatch;
-        protected final CountDownLatch rejectedLatch;
-        protected final CountDownLatch runLatch;
+        private final List<Throwable> exceptions;
+        private final CountDownLatch exceptionLatch;
+        private final CountDownLatch rejectedLatch;
+        private final CountDownLatch runLatch;
 
 
         private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
@@ -272,6 +203,11 @@ public class NamedTaskExecutorImplTest {
         }
 
 
+        @Override
+        public UUID getId() {
+            return UUIDGenerator.newTimeUUID();
+        }
+
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -287,7 +223,7 @@ public class NamedTaskExecutorImplTest {
 
 
         @Override
-        public V executeTask() throws Exception {
+        public V call() throws Exception {
             runLatch.countDown();
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index a3978fc..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -155,7 +155,7 @@ public class GraphModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
-        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount()
 );
+        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount(),
graphFig.getShardAuditWorkerQueueSize()  );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index ed63587..4fe1a63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -27,8 +27,6 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -44,8 +42,9 @@ public interface ShardGroupCompaction {
      *
      * @return A ListenableFuture with the result.  Note that some
      */
-    public Task<AuditResult> evaluateShardGroup(
-            final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup
group );
+    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope
scope,
+                                                             final DirectedEdgeMeta edgeMeta,
+                                                             final ShardEntryGroup group
);
 
 
     public enum AuditResult {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index a84a0f0..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -37,13 +37,14 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.ImmediateTask;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -65,6 +66,9 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.PrimitiveSink;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -273,29 +277,45 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
 
 
     @Override
-    public Task<AuditResult> evaluateShardGroup( final ApplicationScope scope,
-                                                                final DirectedEdgeMeta edgeMeta,
-                                                                final ShardEntryGroup group
) {
+    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope
scope,
+                                                             final DirectedEdgeMeta edgeMeta,
+                                                             final ShardEntryGroup group
) {
 
         final double repairChance = random.nextDouble();
 
 
         //don't audit, we didn't hit our chance
         if ( repairChance > graphFig.getShardRepairChance() ) {
-            return new ImmediateTask<AuditResult>(  AuditResult.NOT_CHECKED ) {};
+            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
         }
 
         /**
          * Try and submit.  During back pressure, we may not be able to submit, that's ok.
 Better to drop than to
          * hose the system
          */
-       return taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+        ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask(
scope, edgeMeta, group ) );
+
+        /**
+         * Log our success or failures for debugging purposes
+         */
+        Futures.addCallback( future, new FutureCallback<AuditResult>() {
+            @Override
+            public void onSuccess( @Nullable final AuditResult result ) {
+                LOG.debug( "Successfully completed audit of task {}", result );
+            }
 
 
+            @Override
+            public void onFailure( final Throwable t ) {
+                LOG.error( "Unable to perform audit.  Exception is ", t );
+            }
+        } );
+
+        return future;
     }
 
 
-    private final class ShardAuditTask extends Task<AuditResult> {
+    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey>
{
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -310,6 +330,11 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
         }
 
 
+        @Override
+        public ShardAuditKey getId() {
+            return new ShardAuditKey( scope, edgeMeta, group );
+        }
+
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -320,12 +345,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
         @Override
         public void rejected() {
             //ignore, if this happens we don't care, we're saturated, we can check later
-            LOG.error( "Rejected audit for shard of with scope {}, edgeMeta of {} and group
of {}", scope, edgeMeta, group );
+            LOG.error( "Rejected audit for shard of {}", getId() );
         }
 
 
         @Override
-        public AuditResult executeTask() throws Exception {
+        public AuditResult call() throws Exception {
             /**
              * We don't have a compaction pending.  Run an audit on the shards
              */
@@ -376,8 +401,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                  */
                 try {
                     CompactionResult result = compact( scope, edgeMeta, group );
-                    LOG.info( "Compaction result for compaction of scope {} with edge meta
data of {} and shard group "
-                                    + "{} is {}", new Object[] { scope, edgeMeta, group,
result } );
+                    LOG.info(
+                            "Compaction result for compaction of scope {} with edge meta
data of {} and shard group " +
+                                    "{} is {}",
+                            new Object[] { scope, edgeMeta, group, result } );
                 }
                 finally {
                     shardCompactionTaskTracker.complete( scope, edgeMeta, group );
@@ -391,7 +418,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
     }
 
 
-    public static final class ShardAuditKey {
+    private static final class ShardAuditKey {
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
         private final ShardEntryGroup group;


Mime
View raw message