usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [01/19] git commit: Added task execution framework
Date Wed, 01 Oct 2014 20:01:00 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-rebuildable-index 3ec144f3b -> a644034c7


Added task execution framework


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/66e508a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/66e508a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/66e508a9

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: 66e508a942d33051c95bfcd80e19b87a40812370
Parents: c667c18
Author: Todd Nine <toddnine@apache.org>
Authored: Wed Sep 24 16:47:24 2014 -0600
Committer: Todd Nine <toddnine@apache.org>
Committed: Wed Sep 24 16:47:24 2014 -0600

----------------------------------------------------------------------
 .../core/task/NamedTaskExecutorImpl.java        | 162 +++++++++++++
 .../usergrid/persistence/core/task/Task.java    |  37 +++
 .../persistence/core/task/TaskExecutor.java     |  14 ++
 .../core/task/NamedTaskExecutorImplTest.java    | 233 +++++++++++++++++++
 4 files changed, 446 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
new file mode 100644
index 0000000..8ba73a0
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -0,0 +1,162 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+
+/**
+ * Implementation of the task executor with a unique name and size
+ */
+public class NamedTaskExecutorImpl implements TaskExecutor {
+
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class
);
+
+    private final ListeningExecutorService executorService;
+
+
+    /**
+     * @param name The name of this instance of the task executor
+     * @param poolSize The size of the pool.  This is the number of concurrent tasks that
can execute at once.
+     * @param queueLength The length of tasks to keep in the queue
+     */
+    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength
) {
+        Preconditions.checkNotNull( name );
+        Preconditions.checkArgument( name.length() > 0, "name must have a length" );
+        Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more"
);
+
+
+        final BlockingQueue<Runnable> queue =
+                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength
) : new SynchronousQueue<Runnable>();
+
+        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( name,
poolSize, queue ) );
+    }
+
+
+    @Override
+    public <V, I> void submit( final Task<V, I> task ) {
+
+        final ListenableFuture<V> future;
+
+        try {
+            future = executorService.submit( task );
+
+            /**
+             * Log our success or failures for debugging purposes
+             */
+            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+        }
+        catch ( RejectedExecutionException ree ) {
+            task.rejected();
+            return;
+        }
+    }
+
+
+    /**
+     * Callback for when the task succeeds or fails.
+     */
+    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V>
{
+
+        private final Task<V, I> task;
+
+
+        private TaskFutureCallBack( Task<V, I> task ) {
+            this.task = task;
+        }
+
+
+        @Override
+        public void onSuccess( @Nullable final V result ) {
+            LOG.debug( "Successfully completed task ", task );
+        }
+
+
+        @Override
+        public void onFailure( final Throwable t ) {
+            LOG.error( "Unable to execute task.  Exception is ", t );
+
+            task.exceptionThrown( t );
+        }
+    }
+
+
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( final String name, final int workerSize, BlockingQueue<Runnable>
queue ) {
+
+            super( 1, workerSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory(
name ),
+                    new RejectedHandler() );
+        }
+    }
+
+
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private static final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+
+        private final String name;
+
+
+        private CountingThreadFactory( final String name ) {this.name = name;}
+
+
+        @Override
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            Thread t = new Thread( r, name + "-" + newValue );
+
+            t.setDaemon( true );
+
+            return t;
+        }
+    }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private static final class RejectedHandler implements RejectedExecutionHandler {
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor
) {
+
+            //            ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?,
?>> ) r;
+            //
+            //            future.
+            //            final Task<?, ?> task = ( Task<?, ?> ) r;
+            LOG.warn( "Audit queue full, rejecting audit task {}", r );
+
+            throw new RejectedExecutionException( "Unable to run task, queue full" );
+            //            LOG.warn( "Audit queue full, rejecting audit task {}", task );
+            //            task.rejected();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
new file mode 100644
index 0000000..8b1ed22
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -0,0 +1,37 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.Callable;
+
+
+
+/**
+ * The task to execute
+ */
+public interface Task<V, I> extends Callable<V> {
+
+    /**
+     * Get the unique identifier of this task.  This may be used to collapse runnables over
a time period in the future
+     *
+     * @return
+     */
+    I getId();
+
+    /**
+     * Invoked when this task throws an uncaught exception.
+     * @param throwable
+     */
+    void exceptionThrown(final Throwable throwable);
+
+    /**
+     * Invoked when we weren't able to run this task by the the thread attempting to schedule
the task.
+     * If this task MUST be run immediately, you can invoke the call method from within this
event to invoke the
+     * task in the scheduling thread.  Note that this has performance implications to the
user.  If you can drop the
+     * request and process later (lazy repair for instanc\\\\\\\\\\\\\\\\\\\\\\hjn ) do so.
+     *
+     */
+    void rejected();
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
new file mode 100644
index 0000000..e60da83
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -0,0 +1,14 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+/**
+ * An interface for execution of tasks
+ */
+public interface TaskExecutor {
+
+    /**
+     * Submit the task asynchronously
+     * @param task
+     */
+    public <V, I> void submit(Task<V, I> task);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
new file mode 100644
index 0000000..9da9263
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -0,0 +1,233 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import junit.framework.TestCase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+
+/**
+ * Tests for the namedtask execution impl
+ */
+public class NamedTaskExecutorImplTest {
+
+
+    @Test
+    public void jobSuccess() throws InterruptedException {
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {};
+
+        executor.submit( task );
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        assertEquals( 0l, exceptionLatch.getCount() );
+
+        assertEquals( 0l, rejectedLatch.getCount() );
+    }
+
+
+    @Test
+    public void exceptionThrown() throws InterruptedException {
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+        final RuntimeException re = new RuntimeException( "throwing exception" );
+
+        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {
+            @Override
+            public Void call() throws Exception {
+                super.call();
+                throw re;
+            }
+        };
+
+        executor.submit( task );
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+        exceptionLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        assertSame( re, task.exceptions.get( 0 ) );
+
+
+        assertEquals( 0l, rejectedLatch.getCount() );
+    }
+
+
+    @Test
+    public void noCapacity() throws InterruptedException {
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+
+        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {
+            @Override
+            public Void call() throws Exception {
+                super.call();
+
+                //park this thread so it takes up a task and the next is rejected
+                final Object mutex = new Object();
+
+                synchronized ( mutex ) {
+                    mutex.wait();
+                }
+
+                return null;
+            }
+        };
+
+        executor.submit( task );
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //now submit the second task
+
+
+        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {};
+
+        executor.submit( testTask );
+
+
+        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //if we get here we've been rejected, just double check we didn't run
+
+        assertEquals( 1l, secondRunLatch.getCount() );
+        assertEquals( 0l, secondExceptionLatch.getCount() );
+    }
+
+
+    @Test
+    public void noCapacityWithQueue() throws InterruptedException {
+
+        final int threadPoolSize = 1;
+        final int queueSize = 10;
+
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize,
queueSize );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+        int iterations = threadPoolSize + queueSize;
+
+        for(int i = 0; i < iterations; i ++) {
+
+            final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {
+                @Override
+                public Void call() throws Exception {
+                    super.call();
+
+                    //park this thread so it takes up a task and the next is rejected
+                    final Object mutex = new Object();
+
+                    synchronized ( mutex ) {
+                        mutex.wait();
+                    }
+
+                    return null;
+                }
+            };
+            executor.submit( task );
+        }
+
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //now submit the second task
+
+
+        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch,
runLatch ) {};
+
+        executor.submit( testTask );
+
+
+        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //if we get here we've been rejected, just double check we didn't run
+
+        assertEquals( 1l, secondRunLatch.getCount() );
+        assertEquals( 0l, secondExceptionLatch.getCount() );
+    }
+
+
+    private static abstract class TestTask<V> implements Task<V, UUID> {
+
+        private final List<Throwable> exceptions;
+        private final CountDownLatch exceptionLatch;
+        private final CountDownLatch rejectedLatch;
+        private final CountDownLatch runLatch;
+
+
+        private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
+                          final CountDownLatch runLatch ) {
+            this.exceptionLatch = exceptionLatch;
+            this.rejectedLatch = rejectedLatch;
+            this.runLatch = runLatch;
+
+            this.exceptions = new ArrayList<>();
+        }
+
+
+        @Override
+        public UUID getId() {
+            return UUIDGenerator.newTimeUUID();
+        }
+
+
+        @Override
+        public void exceptionThrown( final Throwable throwable ) {
+            this.exceptions.add( throwable );
+            exceptionLatch.countDown();
+        }
+
+
+        @Override
+        public void rejected() {
+            rejectedLatch.countDown();
+        }
+
+
+        @Override
+        public V call() throws Exception {
+            runLatch.countDown();
+            return null;
+        }
+    }
+}


Mime
View raw message