usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [02/15] git commit: Migrated graph over to new task executor
Date Wed, 01 Oct 2014 16:32:06 GMT
Migrated graph over to new task executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4c72f5c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4c72f5c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4c72f5c6

Branch: refs/heads/two-dot-o
Commit: 4c72f5c636bae9b16df7578302538c7b4d9600e7
Parents: 66e508a
Author: Todd Nine <toddnine@apache.org>
Authored: Wed Sep 24 17:39:07 2014 -0600
Committer: Todd Nine <toddnine@apache.org>
Committed: Wed Sep 24 17:39:07 2014 -0600

----------------------------------------------------------------------
 .../collection/event/EntityVersionRemoved.java  |  26 +++
 .../collection/guice/CollectionModule.java      |  17 +-
 .../serialization/SerializationFig.java         |  32 ++-
 .../core/astyanax/AstyanaxKeyspaceProvider.java |   2 +
 .../persistence/core/guice/CommonModule.java    |   2 +
 .../core/task/NamedTaskExecutorImpl.java        |  14 +-
 .../persistence/core/task/TaskExecutor.java     |   2 +-
 .../usergrid/persistence/graph/GraphFig.java    |   2 +
 .../persistence/graph/guice/GraphModule.java    |  15 ++
 .../shard/impl/ShardGroupCompactionImpl.java    | 202 ++++++++++++-------
 .../impl/shard/ShardGroupCompactionTest.java    |   5 +-
 11 files changed, 227 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
new file mode 100644
index 0000000..dca575d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
@@ -0,0 +1,26 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+
+
+/**
+ *
+ * Invoked when an entity version is removed.  Note that this is not a deletion of the entity
itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionRemoved {
+
+
+    /**
+     * The version specified was removed.
+     * @param scope
+     * @param entityId
+     * @param entityVersion
+     */
+    public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID
entityVersion);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 84f69db..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -36,6 +36,8 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
 import org.apache.usergrid.persistence.collection.service.UUIDService;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
@@ -79,8 +81,7 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Inject
     @Write
-
-    public WriteStart write (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService)
{
+    public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
         final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
 
         return writeStart;
@@ -90,13 +91,21 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Inject
     @WriteUpdate
-
-    public WriteStart writeUpdate (MvccLogEntrySerializationStrategy logStrategy, UUIDService
uuidService) {
+    public WriteStart writeUpdate (final MvccLogEntrySerializationStrategy logStrategy) {
         final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.PARTIAL
);
 
         return writeStart;
     }
 
+    @Inject
+    @Singleton
+    @Provides
+    public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
+        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(),
serializationFig.getTaskPoolQueueSize() );
+    }
+
+
+
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 81302a6..7e69a19 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -15,25 +15,45 @@ public interface SerializationFig extends GuicyFig {
 
     /**
      * Time to live timeout in seconds.
+     *
      * @return Timeout in seconds.
      */
-    @Key( "collection.stage.transient.timeout" )
-    @Default( "5" )
+    @Key("collection.stage.transient.timeout")
+    @Default("5")
     int getTimeout();
 
     /**
      * Number of history items to return for delete.
+     *
      * @return Timeout in seconds.
      */
-    @Key( "collection.delete.history.size" )
-    @Default( "100" )
+    @Key("collection.delete.history.size")
+    @Default("100")
     int getHistorySize();
 
     /**
      * Number of items to buffer.
+     *
      * @return Timeout in seconds.
      */
-    @Key( "collection.buffer.size" )
-    @Default( "10" )
+    @Key("collection.buffer.size")
+    @Default("10")
     int getBufferSize();
+
+
+    /**
+     * The size of threads to have in the task pool
+     */
+    @Key( "collection.task.pool.threadsize" )
+    @Default( "20" )
+    int getTaskPoolThreadSize();
+
+
+
+    /**
+     * The size of threads to have in the task pool
+     */
+    @Key( "collection.task.pool.queuesize" )
+    @Default( "20" )
+    int getTaskPoolQueueSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
index 7caeaeb..8bd5a9f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import com.google.inject.Inject;
 import com.google.inject.Provider;
+import com.google.inject.Singleton;
 import com.netflix.astyanax.AstyanaxConfiguration;
 import com.netflix.astyanax.AstyanaxContext;
 import com.netflix.astyanax.Keyspace;
@@ -41,6 +42,7 @@ import com.netflix.astyanax.thrift.ThriftFamilyFactory;
  *
  * @author tnine
  */
+@Singleton
 public class AstyanaxKeyspaceProvider implements Provider<Keyspace> {
     private final CassandraFig cassandraFig;
     private final CassandraConfig cassandraConfig;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index a4cc98a..5f461bb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -63,4 +63,6 @@ public class CommonModule extends AbstractModule {
 
 
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8ba73a0..8184937 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -53,7 +53,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
 
     @Override
-    public <V, I> void submit( final Task<V, I> task ) {
+    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
 
         final ListenableFuture<V> future;
 
@@ -67,8 +67,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
-            return;
+            return Futures.immediateCancelledFuture();
         }
+
+        return future;
     }
 
 
@@ -147,16 +149,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
         @Override
         public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor
) {
-
-            //            ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?,
?>> ) r;
-            //
-            //            future.
-            //            final Task<?, ?> task = ( Task<?, ?> ) r;
             LOG.warn( "Audit queue full, rejecting audit task {}", r );
 
             throw new RejectedExecutionException( "Unable to run task, queue full" );
-            //            LOG.warn( "Audit queue full, rejecting audit task {}", task );
-            //            task.rejected();
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index e60da83..b5491bc 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -10,5 +10,5 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V, I> void submit(Task<V, I> task);
+    public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit(
Task<V, I> task );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 0a6ecfa..894e74a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -90,6 +90,8 @@ public interface GraphFig extends GuicyFig {
     public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
 
 
+
+
     @Default("1000")
     @Key(SCAN_PAGE_SIZE)
     int getScanPageSize();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f0e954b..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,6 +24,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
 import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -62,7 +64,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.Sizeb
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
 import com.google.inject.Key;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
 
@@ -144,6 +149,16 @@ public class GraphModule extends AbstractModule {
         migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
         migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class )
);
     }
+
+
+    @Inject
+    @Singleton
+    @Provides
+    public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
+        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount(),
graphFig.getShardAuditWorkerQueueSize()  );
+    }
+
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 5076424..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -30,7 +30,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
@@ -46,6 +45,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -68,8 +69,6 @@ import com.google.common.hash.PrimitiveSink;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -91,7 +90,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
 
 
-    private final ListeningExecutorService executorService;
+    private final TaskExecutor taskExecutor;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final NodeShardAllocation nodeShardAllocation;
@@ -110,7 +109,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                                      final NodeShardAllocation nodeShardAllocation,
                                      final ShardedEdgeSerialization shardedEdgeSerialization,
                                      final EdgeColumnFamilies edgeColumnFamilies, final Keyspace
keyspace,
-                                     final EdgeShardSerialization edgeShardSerialization
) {
+                                     final EdgeShardSerialization edgeShardSerialization,
+                                     final TaskExecutor taskExecutor ) {
 
         this.timeService = timeService;
         this.graphFig = graphFig;
@@ -124,8 +124,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
         this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
         this.shardAuditTaskTracker = new ShardAuditTaskTracker();
 
-        executorService = MoreExecutors.listeningDecorator(
-                new MaxSizeThreadPool( graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize()
) );
+        this.taskExecutor = taskExecutor;
     }
 
 
@@ -232,7 +231,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
         resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard(
targetShard );
 
         /**
-         * We didn't move anything this pass, mark the shard as compacted.  If we move something,
it means that we missed it on the first pass
+         * We didn't move anything this pass, mark the shard as compacted.  If we move something,
+         * it means that we missed it on the first pass
          * or someone is still not writing to the target shard only.
          */
         if ( edgeCount == 0 ) {
@@ -293,91 +293,153 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
          * Try and submit.  During back pressure, we may not be able to submit, that's ok.
 Better to drop than to
          * hose the system
          */
-        ListenableFuture<AuditResult> future = executorService.submit( new Callable<AuditResult>()
{
+        ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask(
scope, edgeMeta, group ) );
+
+        /**
+         * Log our success or failures for debugging purposes
+         */
+        Futures.addCallback( future, new FutureCallback<AuditResult>() {
             @Override
-            public AuditResult call() throws Exception {
+            public void onSuccess( @Nullable final AuditResult result ) {
+                LOG.debug( "Successfully completed audit of task {}", result );
+            }
 
 
-                /**
-                 * We don't have a compaction pending.  Run an audit on the shards
-                 */
-                if ( !group.isCompactionPending() ) {
+            @Override
+            public void onFailure( final Throwable t ) {
+                LOG.error( "Unable to perform audit.  Exception is ", t );
+            }
+        } );
 
-                    /**
-                     * Check if we should allocate, we may want to
-                     */
+        return future;
+    }
 
-                    /**
-                     * It's already compacting, don't do anything
-                     */
-                    if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) )
{
-                        return AuditResult.CHECKED_NO_OP;
-                    }
 
-                    try {
+    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey>
{
 
-                        final boolean created = nodeShardAllocation.auditShard( scope, group,
edgeMeta );
-                        if ( !created ) {
-                            return AuditResult.CHECKED_NO_OP;
-                        }
-                    }
-                    finally {
-                        shardAuditTaskTracker.complete( scope, edgeMeta, group );
-                    }
+        private final ApplicationScope scope;
+        private final DirectedEdgeMeta edgeMeta;
+        private final ShardEntryGroup group;
 
 
-                    return AuditResult.CHECKED_CREATED;
-                }
+        public ShardAuditTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                               final ShardEntryGroup group ) {
+            this.scope = scope;
+            this.edgeMeta = edgeMeta;
+            this.group = group;
+        }
+
+
+        @Override
+        public ShardAuditKey getId() {
+            return new ShardAuditKey( scope, edgeMeta, group );
+        }
+
+
+        @Override
+        public void exceptionThrown( final Throwable throwable ) {
+            LOG.error( "Unable to execute audit for shard of {}", throwable );
+        }
+
+
+        @Override
+        public void rejected() {
+            //ignore, if this happens we don't care, we're saturated, we can check later
+            LOG.error( "Rejected audit for shard of {}", getId() );
+        }
 
-                //check our taskmanager
 
+        @Override
+        public AuditResult call() throws Exception {
+            /**
+             * We don't have a compaction pending.  Run an audit on the shards
+             */
+            if ( !group.isCompactionPending() ) {
 
                 /**
-                 * Do the compaction
+                 * Check if we should allocate, we may want to
                  */
-                if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
-                    /**
-                     * It's already compacting, don't do anything
-                     */
-                    if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group
) ) {
-                        return AuditResult.COMPACTING;
-                    }
 
-                    /**
-                     * We use a finally b/c we always want to remove the task track
-                     */
-                    try {
-                        CompactionResult result = compact( scope, edgeMeta, group );
-                        LOG.info( "Compaction result for compaction of scope {} with edge
meta data of {} and shard group {} is {}", new Object[]{scope, edgeMeta, group, result} );
-                    }
-                    finally {
-                        shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+                /**
+                 * It's already compacting, don't do anything
+                 */
+                if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+                    return AuditResult.CHECKED_NO_OP;
+                }
+
+                try {
+
+                    final boolean created = nodeShardAllocation.auditShard( scope, group,
edgeMeta );
+                    if ( !created ) {
+                        return AuditResult.CHECKED_NO_OP;
                     }
-                    return AuditResult.COMPACTED;
+                }
+                finally {
+                    shardAuditTaskTracker.complete( scope, edgeMeta, group );
                 }
 
-                //no op, there's nothing we need to do to this shard
-                return AuditResult.NOT_CHECKED;
-            }
-        } );
 
-        /**
-         * Log our success or failures for debugging purposes
-         */
-        Futures.addCallback( future, new FutureCallback<AuditResult>() {
-            @Override
-            public void onSuccess( @Nullable final AuditResult result ) {
-                LOG.debug( "Successfully completed audit of task {}", result );
+                return AuditResult.CHECKED_CREATED;
             }
 
+            //check our taskmanager
 
-            @Override
-            public void onFailure( final Throwable t ) {
-                LOG.error( "Unable to perform audit.  Exception is ", t );
+
+            /**
+             * Do the compaction
+             */
+            if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+                /**
+                 * It's already compacting, don't do anything
+                 */
+                if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) )
{
+                    return AuditResult.COMPACTING;
+                }
+
+                /**
+                 * We use a finally b/c we always want to remove the task track
+                 */
+                try {
+                    CompactionResult result = compact( scope, edgeMeta, group );
+                    LOG.info(
+                            "Compaction result for compaction of scope {} with edge meta
data of {} and shard group " +
+                                    "{} is {}",
+                            new Object[] { scope, edgeMeta, group, result } );
+                }
+                finally {
+                    shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+                }
+                return AuditResult.COMPACTED;
             }
-        } );
 
-        return future;
+            //no op, there's nothing we need to do to this shard
+            return AuditResult.NOT_CHECKED;
+        }
+    }
+
+
+    private static final class ShardAuditKey {
+        private final ApplicationScope scope;
+        private final DirectedEdgeMeta edgeMeta;
+        private final ShardEntryGroup group;
+
+
+        private ShardAuditKey( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                               final ShardEntryGroup group ) {
+            this.scope = scope;
+            this.edgeMeta = edgeMeta;
+            this.group = group;
+        }
+
+
+        @Override
+        public String toString() {
+            return "ShardAuditKey{" +
+                    "scope=" + scope +
+                    ", edgeMeta=" + edgeMeta +
+                    ", group=" + group +
+                    '}';
+        }
     }
 
 
@@ -531,7 +593,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
     }
 
 
-
     public static final class CompactionResult {
 
         public final long copiedEdges;
@@ -541,7 +602,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
         public final Shard compactedShard;
 
 
-
         private CompactionResult( final long copiedEdges, final Shard targetShard, final
Set<Shard> sourceShards,
                                   final Set<Shard> removedShards, final Shard compactedShard
) {
             this.copiedEdges = copiedEdges;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 1513e85..9f0792d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -35,6 +35,7 @@ import org.mockito.Matchers;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
@@ -84,6 +85,8 @@ public class ShardGroupCompactionTest {
 
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
 
+        final TaskExecutor taskExecutor = mock(TaskExecutor.class);
+
         final long delta = 10000;
 
         final long createTime = 20000;
@@ -100,7 +103,7 @@ public class ShardGroupCompactionTest {
 
         ShardGroupCompactionImpl compaction =
                 new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation,
shardedEdgeSerialization,
-                        edgeColumnFamilies, keyspace, edgeShardSerialization );
+                        edgeColumnFamilies, keyspace, edgeShardSerialization, taskExecutor
);
 
 
         DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId("source"),
"test" );


Mime
View raw message