usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject [03/15] usergrid git commit: Added an audit for shards when they are empty. They are only ever allocated when either a split + compaction needs to happen. Therefore if they don't have a compaction pending, they should never be empty.
Date Thu, 22 Oct 2015 18:29:32 GMT
Added an audit for shards when they are empty.  They are only ever allocated when either a
split + compaction needs to happen.  Therefore if they don't have a compaction pending, they
should never be empty.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9fddd754
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9fddd754
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9fddd754

Branch: refs/heads/2.1-release
Commit: 9fddd7542572f00233dabff1890187faf89f9e86
Parents: c417099
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Oct 21 15:21:23 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed Oct 21 15:21:23 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/graph/GraphFig.java    |   2 +-
 .../persistence/graph/guice/GraphModule.java    |  11 +-
 .../impl/EdgeSerializationImpl.java             |  17 +-
 .../impl/shard/AsyncTaskExecutor.java           |  34 +++
 .../impl/shard/ShardGroupCompaction.java        |   4 -
 .../impl/shard/ShardGroupDeletion.java          |  68 ++++++
 .../impl/shard/impl/AsyncTaskExecutorImpl.java  |  53 +++++
 .../shard/impl/ShardGroupColumnIterator.java    |  61 +++---
 .../shard/impl/ShardGroupCompactionImpl.java    |  10 +-
 .../impl/shard/impl/ShardGroupDeletionImpl.java | 206 +++++++++++++++++++
 .../impl/shard/ShardGroupCompactionTest.java    |  30 ++-
 11 files changed, 451 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 28aab40..f0df7ff 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -133,7 +133,7 @@ public interface GraphFig extends GuicyFig {
     int getShardCacheRefreshWorkerCount();
 
 
-    @Default( "10" )
+    @Default( "20" )
     @Key( SHARD_AUDIT_WORKERS )
     int getShardAuditWorkerCount();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 6280c7c..d2476eb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -54,7 +54,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.TargetIdObservab
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.EdgeDataMigrationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigration;
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigrationPlugin;
-import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
@@ -62,14 +62,17 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardA
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.AsyncTaskExecutorImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupDeletionImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardedEdgeSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
@@ -116,6 +119,12 @@ public abstract class GraphModule extends AbstractModule {
         bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class
);
 
         /**
+         * Binding for task tracker
+         */
+        bind( AsyncTaskExecutor.class ).to( AsyncTaskExecutorImpl.class );
+        bind( ShardGroupDeletion.class ).to( ShardGroupDeletionImpl.class );
+
+        /**
          * Bind our strategies based on their internal annotations.
          */
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 4c1ae79..9e25946 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumn
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
@@ -67,6 +68,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
     protected final EdgeColumnFamilies edgeColumnFamilies;
     protected final ShardedEdgeSerialization shardedEdgeSerialization;
     protected final TimeService timeService;
+    protected final ShardGroupDeletion shardGroupDeletion;
 
 
     @Inject
@@ -74,7 +76,8 @@ public class EdgeSerializationImpl implements EdgeSerialization {
                                   final GraphFig graphFig, final EdgeShardStrategy edgeShardStrategy,
                                   final EdgeColumnFamilies edgeColumnFamilies,
                                   final ShardedEdgeSerialization shardedEdgeSerialization,
-                                  final TimeService timeService ) {
+                                  final TimeService timeService, final ShardGroupDeletion
shardGroupDeletion ) {
+
 
 
         checkNotNull( keyspace, "keyspace required" );
@@ -83,6 +86,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
         checkNotNull( edgeColumnFamilies, "edgeColumnFamilies required" );
         checkNotNull( shardedEdgeSerialization, "shardedEdgeSerialization required" );
         checkNotNull( timeService, "timeService required" );
+        checkNotNull( shardGroupDeletion, "shardGroupDeletion require");
 
 
         this.keyspace = keyspace;
@@ -92,6 +96,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
         this.edgeColumnFamilies = edgeColumnFamilies;
         this.shardedEdgeSerialization = shardedEdgeSerialization;
         this.timeService = timeService;
+        this.shardGroupDeletion = shardGroupDeletion;
     }
 
 
@@ -293,7 +298,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
 
         //now create a result iterator with our iterator of read shards
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, versionMetaData, shardGroupDeletion,
readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard>
readShards ) {
                 return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope,
search, readShards );
@@ -319,7 +324,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
 
         final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards(
scope, maxTimestamp, directedEdgeMeta );
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion,
readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard>
readShards ) {
                 return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope,
edgeType, readShards );
@@ -346,7 +351,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
         final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards(
scope, maxTimestamp, directedEdgeMeta );
 
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion,
readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard>
readShards ) {
                 return shardedEdgeSerialization
@@ -372,7 +377,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
 
         final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards(
scope, maxTimestamp, directedEdgeMeta );
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion,
readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard>
readShards ) {
                 return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope,
edgeType, readShards );
@@ -400,7 +405,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
         final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards(
scope, maxTimestamp, directedEdgeMeta );
 
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion,
readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard>
readShards ) {
                 return shardedEdgeSerialization

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
new file mode 100644
index 0000000..f4dc350
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+
+/**
+ * An interface for returning a singleton task executor
+ */
+public interface AsyncTaskExecutor {
+
+    /**
+     * Get the executor service for executing graph tasks
+     * @return
+     */
+    ListeningExecutorService getExecutorService();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 4fe1a63..0ff9fb1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -22,10 +22,6 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.util.concurrent.ListenableFuture;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
new file mode 100644
index 0000000..3d5a1ef
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+
+public interface ShardGroupDeletion {
+
+
+    /**
+     * Audit the shard entry group with the given NEW instance of the shardGroupColumnIterator.
Returns the future with
+     * the outcome of the deletion task
+     *
+     * @param shardEntryGroup The group to evaluate
+     * @param edgeIterator The new instance of the edge iterator
+     *
+     * @return The delete result with the state of the delete operation
+     */
+    ListenableFuture<DeleteResult> maybeDeleteShard( final ApplicationScope applicationScope,
+                                                     final DirectedEdgeMeta directedEdgeMeta,
+                                                     final ShardEntryGroup shardEntryGroup,
+                                                     final Iterator<MarkedEdge> edgeIterator
);
+
+
+    enum DeleteResult {
+        /**
+         * Returned if the shard was delete
+         */
+        DELETED,
+
+        /**
+         * The shard contains edges and cannot be deleted
+         */
+        CONTAINS_EDGES,
+
+        /**
+         * The shard is too new, and may not have been fully replicated, we can't delete
it safely
+         */
+        TOO_NEW,
+
+        /**
+         * Our capacity was saturated, we didnt' check the shard
+         */
+        NOT_CHECKED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
new file mode 100644
index 0000000..c534a22
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Implementation for a single task execution system within graph
+ */
+@Singleton
+public class AsyncTaskExecutorImpl implements AsyncTaskExecutor {
+
+
+    private final ListeningExecutorService taskExecutor;
+
+
+    @Inject
+    public AsyncTaskExecutorImpl(final GraphFig graphFig){
+        this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory
+                    .createTaskExecutor( "GraphTaskExecutor", graphFig.getShardAuditWorkerCount(),
+                        graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT
) );
+    }
+
+
+    @Override
+    public ListeningExecutorService getExecutorService() {
+        return this.taskExecutor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
index a794a16..72b617f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -22,45 +22,52 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
 import java.util.NoSuchElementException;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
 
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.query.RowQuery;
-import com.netflix.astyanax.util.RangeBuilder;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 
 /**
- *
  * Iterator to keep iterating over multiple shard groups to stream results
  *
  * @param <T> The parsed return type
  */
-public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
+public abstract class ShardGroupColumnIterator implements Iterator<MarkedEdge> {
 
 
     private static final Logger logger = LoggerFactory.getLogger( ShardGroupColumnIterator.class
);
 
+    private final ApplicationScope applicationScope;
+    private final DirectedEdgeMeta directedEdgeMeta;
+    private final ShardGroupDeletion shardGroupDeletion;
     private final Iterator<ShardEntryGroup> entryGroupIterator;
-    private Iterator<T> elements;
 
 
-    public ShardGroupColumnIterator( final Iterator<ShardEntryGroup> entryGroupIterator
){
+    private Iterator<MarkedEdge> elements;
+
+
+    public ShardGroupColumnIterator( final ApplicationScope applicationScope, final DirectedEdgeMeta
directedEdgeMeta,
+                                     final ShardGroupDeletion shardGroupDeletion,
+                                     final Iterator<ShardEntryGroup> entryGroupIterator
) {
+        this.applicationScope = applicationScope;
+        this.directedEdgeMeta = directedEdgeMeta;
+        this.shardGroupDeletion = shardGroupDeletion;
         this.entryGroupIterator = entryGroupIterator;
     }
 
@@ -69,16 +76,16 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T>
{
     public boolean hasNext() {
 
 
-        if(elements == null){
+        if ( elements == null ) {
             return advance();
         }
 
-        if(elements.hasNext()){
+        if ( elements.hasNext() ) {
             return true;
         }
 
         //we've exhausted our shard groups and we don't have a next, we can't continue
-        if(!entryGroupIterator.hasNext()){
+        if ( !entryGroupIterator.hasNext() ) {
             return false;
         }
 
@@ -88,7 +95,7 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T>
{
 
 
     @Override
-    public T next() {
+    public MarkedEdge next() {
         if ( !hasNext() ) {
             throw new NoSuchElementException( "There are no more rows or columns left to
advance" );
         }
@@ -105,17 +112,17 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T>
{
 
     /**
      * Get an iterator for the shard entry group
+     *
      * @param readShards the read shards to use
-     * @return
      */
-    protected abstract Iterator<T> getIterator(Collection<Shard> readShards);
+    protected abstract Iterator<MarkedEdge> getIterator( Collection<Shard> readShards
);
 
 
-    public boolean advance(){
+    public boolean advance() {
 
         logger.trace( "Advancing from shard entry group iterator" );
 
-        while(entryGroupIterator.hasNext()){
+        while ( entryGroupIterator.hasNext() ) {
 
             final ShardEntryGroup group = entryGroupIterator.next();
 
@@ -126,16 +133,22 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T>
{
             /**
              * We're done, we have some columns to return
              */
-            if(elements.hasNext()){
+            if ( elements.hasNext() ) {
                 logger.trace( "Found edges in shard entry group {}", group );
                 return true;
             }
+            else {
+                logger.trace( "Our shard is empty, we need to perform an audit on shard group
{}", group );
 
+                //fire and forget if we miss it here, we'll get it next read
+                shardGroupDeletion.maybeDeleteShard(this.applicationScope, this.directedEdgeMeta,
group, getIterator( group.getReadShards() ) );
+
+
+            }
         }
 
         logger.trace( "Completed iterating shard group iterator" );
 
         return false;
-
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index c9e389f..21f2d72 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -45,6 +45,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -108,7 +109,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                                      final NodeShardAllocation nodeShardAllocation,
                                      final ShardedEdgeSerialization shardedEdgeSerialization,
                                      final EdgeColumnFamilies edgeColumnFamilies, final Keyspace
keyspace,
-                                     final EdgeShardSerialization edgeShardSerialization)
{
+                                     final EdgeShardSerialization edgeShardSerialization,
+                                     final AsyncTaskExecutor asyncTaskExecutor) {
 
         this.timeService = timeService;
         this.countAudits = new AtomicLong();
@@ -124,9 +126,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
         this.shardAuditTaskTracker = new ShardAuditTaskTracker();
 
 
-        this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory
-            .createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(),
-                graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT
) );
+        this.taskExecutor = asyncTaskExecutor.getExecutorService();
     }
 
 
@@ -305,7 +305,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
         }
 
         countAudits.getAndIncrement();
-        
+
         if(LOG.isDebugEnabled()) {
             LOG.debug("Auditing shard group. count is {} ", countAudits.get());
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
new file mode 100644
index 0000000..c51a021
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Implementation of the shard group deletion task
+ */
+@Singleton
+public class ShardGroupDeletionImpl implements ShardGroupDeletion {
+
+    private static final Logger logger = LoggerFactory.getLogger( ShardGroupDeletionImpl.class
);
+
+    private final ListeningExecutorService asyncTaskExecutor;
+    private final EdgeShardSerialization edgeShardSerialization;
+
+
+    @Inject
+    public ShardGroupDeletionImpl( final AsyncTaskExecutor asyncTaskExecutor,
+                                   final EdgeShardSerialization edgeShardSerialization )
{
+        this.edgeShardSerialization = edgeShardSerialization;
+        this.asyncTaskExecutor = asyncTaskExecutor.getExecutorService();
+    }
+
+
+    @Override
+    public ListenableFuture<DeleteResult> maybeDeleteShard( final ApplicationScope
applicationScope,
+                                                            final DirectedEdgeMeta directedEdgeMeta,
+                                                            final ShardEntryGroup shardEntryGroup,
+                                                            final Iterator<MarkedEdge>
edgeIterator ) {
+
+
+        /**
+         * Try and submit.  During back pressure, we may not be able to submit, that's ok.
 Better to drop than to
+         * hose the system
+         */
+        final ListenableFuture<DeleteResult> future;
+
+        try {
+            future = asyncTaskExecutor
+                .submit( new ShardDeleteTask( applicationScope, directedEdgeMeta, shardEntryGroup,
edgeIterator ) );
+        }
+        catch ( RejectedExecutionException ree ) {
+
+            //ignore, if this happens we don't care, we're saturated, we can check later
+            logger.error( "Rejected shard delete check for group {}", edgeIterator );
+
+            return Futures.immediateFuture( DeleteResult.NOT_CHECKED );
+        }
+
+
+        /**
+         * Log our success or failures for debugging purposes
+         */
+        Futures.addCallback( future, new FutureCallback<DeleteResult>() {
+            @Override
+            public void onSuccess( @Nullable final ShardGroupDeletion.DeleteResult result
) {
+                logger.trace( "Successfully completed delete of task {}", result );
+            }
+
+
+            @Override
+            public void onFailure( final Throwable t ) {
+                logger.error( "Unable to perform shard delete audit.  Exception is ", t );
+            }
+        } );
+
+        return future;
+    }
+
+
+    /**
+     * Execute the logic for the delete
+     */
+    private DeleteResult maybeDeleteShardInternal( final ApplicationScope applicationScope,
+                                                   final DirectedEdgeMeta directedEdgeMeta,
+                                                   final ShardEntryGroup shardEntryGroup,
+                                                   final Iterator<MarkedEdge> edgeIterator
) {
+
+        /**
+         * Compaction is pending, we cannot check it
+         */
+        if ( shardEntryGroup.isCompactionPending() ) {
+            return DeleteResult.NOT_CHECKED;
+        }
+
+
+        /**
+         * If any of the shards can't be deleted, then we can't delete it
+         */
+        for ( final Shard shard : shardEntryGroup.getReadShards() ) {
+            if ( !shardEntryGroup.canBeDeleted( shard ) ) {
+                return DeleteResult.TOO_NEW;
+            }
+        }
+
+        /**
+         * We have edges, and therefore cannot delete them
+         */
+        if ( edgeIterator.hasNext() ) {
+            return DeleteResult.CONTAINS_EDGES;
+        }
+
+
+        //now we can proceed based on the shard meta state and we don't have any edge
+
+        MutationBatch rollup = null;
+
+        for ( final Shard shard : shardEntryGroup.getReadShards() ) {
+            final MutationBatch shardRemovalMutation =
+                edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta
);
+
+            if ( rollup == null ) {
+                rollup = shardRemovalMutation;
+            }
+
+            else {
+                rollup.mergeShallow( shardRemovalMutation );
+            }
+        }
+
+
+        Preconditions.checkNotNull( rollup, "rollup should be assigned" );
+
+        try {
+            rollup.execute();
+        }
+        catch ( ConnectionException e ) {
+            logger.error( "Unable to execute shard deletion", e );
+            throw new RuntimeException( "Unable to execute shard deletion", e );
+        }
+
+        return DeleteResult.DELETED;
+    }
+
+
+    /**
+     * Glue for executing the task
+     */
+    private final class ShardDeleteTask implements Callable<DeleteResult> {
+
+        private final ApplicationScope applicationScope;
+        private final DirectedEdgeMeta directedEdgeMeta;
+        private final ShardEntryGroup shardEntryGroup;
+        private final Iterator<MarkedEdge> edgeIterator;
+
+
+        private ShardDeleteTask( final ApplicationScope applicationScope, final DirectedEdgeMeta
directedEdgeMeta,
+                                 final ShardEntryGroup shardEntryGroup, final Iterator<MarkedEdge>
edgeIterator ) {
+            this.applicationScope = applicationScope;
+            this.directedEdgeMeta = directedEdgeMeta;
+            this.shardEntryGroup = shardEntryGroup;
+            this.edgeIterator = edgeIterator;
+        }
+
+
+        @Override
+        public DeleteResult call() throws Exception {
+            return maybeDeleteShardInternal( applicationScope, directedEdgeMeta, shardEntryGroup,
edgeIterator );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 7d4b7f6..65f19ff 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -26,16 +26,20 @@ import java.util.Collection;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.util.IdGenerator;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.netflix.astyanax.Keyspace;
 
 import static org.junit.Assert.assertEquals;
@@ -47,6 +51,8 @@ import static org.mockito.Mockito.when;
 public class ShardGroupCompactionTest {
 
     protected GraphFig graphFig;
+    protected AsyncTaskExecutor asyncTaskExecutor;
+    protected ListeningExecutorService listeningExecutorService;
     protected ApplicationScope scope;
 
 
@@ -58,9 +64,25 @@ public class ShardGroupCompactionTest {
 
         when( graphFig.getShardAuditWorkerQueueSize() ).thenReturn( 1000 );
 
+
+
+        listeningExecutorService = MoreExecutors.listeningDecorator( TaskExecutorFactory
+            .createTaskExecutor( "GraphTaskExecutor", graphFig.getShardAuditWorkerCount(),
+                graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT
) );
+
+        asyncTaskExecutor = mock( AsyncTaskExecutor.class );
+
+        when( asyncTaskExecutor.getExecutorService() ).thenReturn( listeningExecutorService
);
+
+
         this.scope = new ApplicationScopeImpl( IdGenerator.createId( "application" ) );
     }
 
+    @After
+    public void shutDown(){
+        listeningExecutorService.shutdownNow();
+    }
+
 
     @Test
     public void shouldNotCompact() {
@@ -77,6 +99,7 @@ public class ShardGroupCompactionTest {
 
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
 
+
         final long delta = 10000;
 
         final long createTime = 20000;
@@ -92,9 +115,8 @@ public class ShardGroupCompactionTest {
         when( timeService.getCurrentTime() ).thenReturn( timeNow );
 
         ShardGroupCompactionImpl compaction =
-                new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation,
shardedEdgeSerialization,
-                        edgeColumnFamilies, keyspace, edgeShardSerialization );
-
+            new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
+                edgeColumnFamilies, keyspace, edgeShardSerialization, asyncTaskExecutor );
 
         DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( IdGenerator.createId(
"source" ), "test" );
 
@@ -104,7 +126,7 @@ public class ShardGroupCompactionTest {
         }
         catch ( Throwable t ) {
             assertEquals( "Correct error message returned", "Compaction cannot be run yet.
 Ignoring compaction.",
-                    t.getMessage() );
+                t.getMessage() );
         }
     }
 


Mime
View raw message