usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [05/19] git commit: Added tests for the log entry iterator
Date Wed, 01 Oct 2014 20:01:04 GMT
Added tests for the log entry iterator


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/018dbeeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/018dbeeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/018dbeeb

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: 018dbeeb8ad46bf536132120f78736fa2b0dd367
Parents: dc3f448
Author: Todd Nine <toddnine@apache.org>
Authored: Fri Sep 26 11:21:45 2014 -0600
Committer: Todd Nine <toddnine@apache.org>
Committed: Fri Sep 26 16:21:18 2014 -0600

----------------------------------------------------------------------
 .../impl/EntityVersionCleanupTask.java          |  52 +++++--
 .../serialization/impl/LogEntryIterator.java    | 111 ++++++++++++++
 .../serialization/impl/VersionIterator.java     | 111 --------------
 .../impl/EntityVersionCleanupTaskTest.java      |  40 +++++
 .../impl/LogEntryIteratorTest.java              | 131 ++++++++++++++++
 .../collection/util/LogEntryMock.java           | 152 +++++++++++++++++++
 .../core/task/NamedTaskExecutorImpl.java        | 131 +++-------------
 .../usergrid/persistence/core/task/Task.java    |  42 +++++
 8 files changed, 539 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 85afce6..29ca3ac 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,8 +1,11 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -11,8 +14,10 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
-import org.apache.usergrid.persistence.collection.serialization.impl.VersionIterator;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
 import org.apache.usergrid.persistence.core.entity.EntityVersion;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -22,7 +27,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
  * Cleans up previous versions from the specified version. Note that this means the version
passed in the io event is
  * retained, the range is exclusive.
  */
-class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>>
{
+public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>,
CollectionIoEvent<EntityVersion>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class
);
 
@@ -33,15 +38,19 @@ class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>,
Co
     private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
 
+    private final SerializationFig serializationFig;
+
 
     private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
                                       final MvccEntitySerializationStrategy entitySerializationStrategy,
                                       final CollectionIoEvent<EntityVersion> collectionIoEvent,
-                                      final List<EntityVersionDeleted> listeners )
{
+                                      final List<EntityVersionDeleted> listeners,
+                                      final SerializationFig serializationFig ) {
         this.collectionIoEvent = collectionIoEvent;
         this.listeners = listeners;
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        this.serializationFig = serializationFig;
     }
 
 
@@ -79,34 +88,51 @@ class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>,
Co
         final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
 
 
-        VersionIterator versionIterator =
-                new VersionIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
1000 );
-
+        LogEntryIterator logEntryIterator =
+                new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
serializationFig.getHistorySize() );
 
-        UUID currentVersion = null;
 
         //for every entry, we want to clean it up with listeners
 
-        while ( versionIterator.hasNext() ) {
+        while ( logEntryIterator.hasNext() ) {
+
+            final MvccLogEntry logEntry = logEntryIterator.next();
 
-            currentVersion = versionIterator.next();
+
+            final UUID version = logEntry.getVersion();
+            List<ForkJoinTask<Void>> tasks = new ArrayList<>();
 
 
             //execute all the listeners
-            for ( EntityVersionDeleted listener : listeners ) {
-                listener.versionDeleted( scope, entityId, currentVersion );
+            for (final  EntityVersionDeleted listener : listeners ) {
+
+                tasks.add( new RecursiveTask<Void>() {
+                    @Override
+                    protected Void compute() {
+                        listener.versionDeleted( scope, entityId, version );
+                        return null;
+                    }
+                }.fork() );
+
+
             }
 
+            //wait for them to complete
+
+            joinAll(tasks);
+
             //we do multiple invocations on purpose.  Our log is our source of versions,
only delete from it
             //after every successful invocation of listeners and entity removal
-            entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+            entitySerializationStrategy.delete( scope, entityId, version ).execute();
 
-            logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+            logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
         }
 
 
         return collectionIoEvent;
     }
+
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
new file mode 100644
index 0000000..53eb6e3
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
@@ -0,0 +1,111 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified
maxVersion
+ *
+ */
+public class LogEntryIterator implements Iterator<MvccLogEntry> {
+
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final CollectionScope scope;
+    private final Id entityId;
+    private final int pageSize;
+
+
+    private Iterator<MvccLogEntry> elementItr;
+    private UUID nextStart;
+
+
+    /**
+     *
+     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+     * @param scope The scope of the entity
+     * @param entityId The id of the entity
+     * @param maxVersion The max version of the entity.  Iterator will iterate from max to
min starting with the version < max
+     * @param pageSize The fetch size to get when querying the serialization strategy
+     */
+    public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                             final CollectionScope scope, final Id entityId, final UUID maxVersion,
+                             final int pageSize ) {
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.scope = scope;
+        this.entityId = entityId;
+        this.nextStart = maxVersion;
+        this.pageSize = pageSize;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+            try {
+                advance();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to query cassandra", e );
+            }
+        }
+
+        return elementItr.hasNext();
+    }
+
+
+    @Override
+    public MvccLogEntry next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements exist" );
+        }
+
+        return elementItr.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator
+     */
+    public void advance() throws ConnectionException {
+
+        final int requestedSize = pageSize + 1;
+
+        //loop through even entry that's < this one and remove it
+        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId,
nextStart, requestedSize );
+
+        //we always remove the first version if it's equal since it's returned
+        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart
) ) {
+            results.remove( 0 );
+        }
+
+
+        //we have results, set our next start
+        if ( results.size() == pageSize ) {
+            nextStart = results.get( results.size() - 1 ).getVersion();
+        }
+        //nothing left to do
+        else {
+            nextStart = null;
+        }
+
+        elementItr = results.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
deleted file mode 100644
index 323f12d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * Iterator that will iterate all versions of the entity from the log from < the specified
maxVersion
- *
- */
-public class VersionIterator implements Iterator<UUID> {
-
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final CollectionScope scope;
-    private final Id entityId;
-    private final int pageSize;
-
-
-    private Iterator<MvccLogEntry> elementItr;
-    private UUID nextStart;
-
-
-    /**
-     *
-     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
-     * @param scope The scope of the entity
-     * @param entityId The id of the entity
-     * @param maxVersion The max version of the entity.  Iterator will iterate from max to
min starting with the version < max
-     * @param pageSize The fetch size to get when querying the serialization strategy
-     */
-    public VersionIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                            final CollectionScope scope, final Id entityId, final UUID maxVersion,
-                            final int pageSize ) {
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.scope = scope;
-        this.entityId = entityId;
-        this.nextStart = maxVersion;
-        this.pageSize = pageSize;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
-            try {
-                advance();
-            }
-            catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to query cassandra", e );
-            }
-        }
-
-        return elementItr.hasNext();
-    }
-
-
-    @Override
-    public UUID next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "No more elements exist" );
-        }
-
-        return elementItr.next().getVersion();
-    }
-
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is unsupported" );
-    }
-
-
-    /**
-     * Advance our iterator
-     */
-    public void advance() throws ConnectionException {
-
-        final int requestedSize = pageSize + 1;
-
-        //loop through even entry that's < this one and remove it
-        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId,
nextStart, requestedSize );
-
-        //we always remove the first version if it's equal since it's returned
-        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart
) ) {
-            results.remove( 0 );
-        }
-
-
-        //we have results, set our next start
-        if ( results.size() == requestedSize ) {
-            nextStart = results.get( results.size() - 1 ).getVersion();
-        }
-        //nothing left to do
-        else {
-            nextStart = null;
-        }
-
-        elementItr = results.iterator();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
new file mode 100644
index 0000000..050ea9e
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Cleanup task tests
+ */
+public class EntityVersionCleanupTaskTest {
+
+    @Test
+    public void multiPageTask(){
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
new file mode 100644
index 0000000..9ee284b
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
@@ -0,0 +1,131 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests iterator paging
+ */
+public class LogEntryIteratorTest {
+
+
+    @Test
+    public void empty() throws ConnectionException {
+
+        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+        final CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "application" ), new SimpleId( "owner"
), "entities" );
+
+        final Id entityId = new SimpleId( "entity" );
+
+        final int pageSize = 100;
+
+
+        //set the start version, it should be discarded
+        UUID start = UUIDGenerator.newTimeUUID();
+
+        when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same(
start ), same( pageSize ) ) )
+                .thenReturn( new ArrayList<MvccLogEntry>() );
+
+
+        //now iterate we should get everything
+        LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope,
entityId, start, pageSize );
+
+
+        assertFalse( itr.hasNext() );
+    }
+
+
+    @Test
+    public void partialLastPage() throws ConnectionException {
+
+
+        final int pageSize = 10;
+        final int totalPages = 3;
+        final int lastPageSize = pageSize / 2;
+
+        //have one half page
+
+        pageElements( pageSize, totalPages, lastPageSize );
+    }
+
+
+    @Test
+    public void emptyLastPage() throws ConnectionException {
+
+
+        final int pageSize = 10;
+        final int totalPages = 3;
+        final int lastPageSize = 0;
+
+        //have one half page
+
+        pageElements( pageSize, totalPages, lastPageSize );
+    }
+
+
+    public void pageElements( final int pageSize, final int totalPages, final int lastPageSize
)
+            throws ConnectionException {
+
+        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+        final CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "application" ), new SimpleId( "owner"
), "entities" );
+
+        final Id entityId = new SimpleId( "entity" );
+
+
+        //have one half page
+        final int toGenerate = pageSize * totalPages + lastPageSize;
+
+
+        final LogEntryMock mockResults =
+                LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId,
toGenerate );
+
+        Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();
+
+        //this element should be skipped
+        UUID start = expectedEntries.next().getVersion();
+
+        //now iterate we should get everything
+        LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope,
entityId, start, pageSize );
+
+
+        while ( expectedEntries.hasNext() && itr.hasNext() ) {
+            final MvccLogEntry expected = expectedEntries.next();
+
+            final MvccLogEntry returned = itr.next();
+
+            assertEquals( expected, returned );
+        }
+
+
+        assertFalse( itr.hasNext() );
+        assertFalse( expectedEntries.hasNext() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
new file mode 100644
index 0000000..a25bc94
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
@@ -0,0 +1,152 @@
+package org.apache.usergrid.persistence.collection.util;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Utility for constructing representative log entries for mock serialziation from high to
low
+ */
+public class LogEntryMock {
+
+
+    private final TreeMap<UUID, MvccLogEntry> entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE);
+
+    private final Id entityId;
+
+
+    /**
+     * Create a mock list of versions of the specified size
+     *
+     * @param entityId The entity Id to use
+     * @param size The size to use
+     */
+    private LogEntryMock(final Id entityId, final int size ) {
+
+        this.entityId = entityId;
+
+        for ( int i = 0; i < size; i++ ) {
+
+            final UUID version = UUIDGenerator.newTimeUUID();
+
+            entries.put( version,
+                    new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE
) );
+        }
+    }
+
+
+    /**
+     * Init the mock with the given data structure
+     * @param logEntrySerializationStrategy The strategy to moc
+     * @param scope
+     * @throws ConnectionException
+     */
+    private void initMock(  final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
final  CollectionScope scope )
+
+            throws ConnectionException {
+
+        //wire up the mocks
+        when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class),
any(Integer.class)  )).thenAnswer( new Answer<List<MvccLogEntry>>() {
+
+
+            @Override
+            public List<MvccLogEntry> answer( final InvocationOnMock invocation ) throws
Throwable {
+                final UUID startVersion = ( UUID ) invocation.getArguments()[2];
+                final int count = (Integer)invocation.getArguments()[3];
+
+                final List<MvccLogEntry> results = new ArrayList<>( count );
+
+                final Iterator<MvccLogEntry> itr = entries.tailMap( startVersion, true
).values().iterator();
+
+                for(int i = 0; i < count && itr.hasNext(); i ++){
+                    results.add( itr.next() );
+                }
+
+
+                return results;
+            }
+        } );
+    }
+
+
+    /**
+     * Get the entries (ordered from high to low) this mock contains
+     * @return
+     */
+    public Collection<MvccLogEntry> getEntries(){
+        return entries.values();
+    }
+
+    /**
+     *
+     * @param logEntrySerializationStrategy The mock to use
+     * @param scope The scope to use
+     * @param entityId The entityId to use
+     * @param size The number of entries to mock
+     * @throws ConnectionException
+     */
+    public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy
logEntrySerializationStrategy, final  CollectionScope scope,final Id entityId, final int size
)
+
+            throws ConnectionException {
+        LogEntryMock mock = new LogEntryMock( entityId, size );
+        mock.initMock( logEntrySerializationStrategy, scope );
+
+        return mock;
+    }
+
+
+    private static final class ReversedUUIDComparator implements Comparator<UUID> {
+
+        public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator();
+
+
+        @Override
+        public int compare( final UUID o1, final UUID o2 ) {
+            return UUIDComparator.staticCompare( o1, o2 ) * -1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index aba8cd5..4fc72c8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.usergrid.persistence.core.task;
 
 
@@ -36,7 +54,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
      * @param name The name of this instance of the task executor
      * @param poolSize The size of the pool.  This is the number of concurrent tasks that
can execute at once.
      */
-    public NamedTaskExecutorImpl( final String name, final int poolSize) {
+    public NamedTaskExecutorImpl( final String name, final int poolSize ) {
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
@@ -44,12 +62,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
         this.name = name;
         this.poolSize = poolSize;
 
-//        final BlockingQueue<Runnable> queue =
-//                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength
) : new SynchronousQueue<Runnable>();
-//
-//        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue
) );
-
-       this.executorService =  new NamedForkJoinPool(poolSize);
+        this.executorService = new NamedForkJoinPool( poolSize );
     }
 
 
@@ -57,126 +70,30 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     public <V, I> Task<V, I> submit( final Task<V, I> task ) {
 
         try {
-           executorService.submit( task );
+            executorService.submit( task );
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
         }
 
         return task;
-
     }
 
 
-    /**
-     * Callback for when the task succeeds or fails.
-     */
-    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V>
{
-
-        private final Task<V, I> task;
-
-
-        private TaskFutureCallBack( Task<V, I> task ) {
-            this.task = task;
-        }
-
-
-        @Override
-        public void onSuccess( @Nullable final V result ) {
-            LOG.debug( "Successfully completed task ", task );
-        }
-
-
-        @Override
-        public void onFailure( final Throwable t ) {
-            LOG.error( "Unable to execute task.  Exception is ", t );
-
-            task.exceptionThrown( t );
-        }
-    }
-
-
-    private final class NamedForkJoinPool extends ForkJoinPool{
+    private final class NamedForkJoinPool extends ForkJoinPool {
 
         private NamedForkJoinPool( final int workerThreadCount ) {
             //TODO, verify the scheduler at the end
             super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(),
true );
         }
-
-
-
     }
 
-    private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler{
+
+    private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler {
 
         @Override
         public void uncaughtException( final Thread t, final Throwable e ) {
             LOG.error( "Uncaught exception on thread {} was {}", t, e );
         }
     }
-
-
-
-
-    private final class NamedWorkerThread extends ForkJoinWorkerThread{
-
-        /**
-         * Creates a ForkJoinWorkerThread operating in the given pool.
-         *
-         * @param pool the pool this thread works in
-         *
-         * @throws NullPointerException if pool is null
-         */
-        protected NamedWorkerThread(final String name,  final ForkJoinPool pool ) {
-            super( pool );
-        }
-    }
-    /**
-     * Create a thread pool that will reject work if our audit tasks become overwhelmed
-     */
-    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
-        public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
-
-            super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
-                    new RejectedHandler() );
-        }
-    }
-
-
-    /**
-     * Thread factory that will name and count threads for easier debugging
-     */
-    private final class CountingThreadFactory implements ThreadFactory {
-
-        private final AtomicLong threadCounter = new AtomicLong();
-
-
-        @Override
-        public Thread newThread( final Runnable r ) {
-            final long newValue = threadCounter.incrementAndGet();
-
-            Thread t = new Thread( r, name + "-" + newValue );
-
-            t.setDaemon( true );
-
-            return t;
-        }
-    }
-
-
-    /**
-     * The handler that will handle rejected executions and signal the interface
-     */
-    private final class RejectedHandler implements RejectedExecutionHandler {
-
-
-        @Override
-        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor
) {
-            LOG.warn( "{} task queue full, rejecting task {}", name, r );
-
-            throw new RejectedExecutionException( "Unable to run task, queue full" );
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index eb04c2c..4dccc72 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,6 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.usergrid.persistence.core.task;
 
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.RecursiveTask;
 
 
@@ -28,6 +50,26 @@ public abstract class Task<V, I> extends RecursiveTask<V> {
 
 
     /**
+     * Fork all tasks in the list except the last one.  The last will be run in the caller
thread
+     * The others will wait for join
+     * @param tasks
+     *
+     */
+    public <V, T extends ForkJoinTask<V>> List<V> joinAll(List<T>
tasks) throws ExecutionException, InterruptedException {
+
+        //don't fork the last one
+       List<V> results = new ArrayList<>(tasks.size());
+
+        for(T task: tasks){
+            results.add( task.join() );
+        }
+
+        return results;
+
+    }
+
+
+    /**
      * Execute the task
      */
     public abstract V executeTask() throws Exception;


Mime
View raw message