Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 597CA17D2E for ; Wed, 1 Oct 2014 20:01:01 +0000 (UTC) Received: (qmail 89313 invoked by uid 500); 1 Oct 2014 20:01:01 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 89263 invoked by uid 500); 1 Oct 2014 20:01:01 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 89178 invoked by uid 99); 1 Oct 2014 20:01:01 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Oct 2014 20:01:01 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 000C6925DC0; Wed, 1 Oct 2014 20:01:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: snoopdave@apache.org To: commits@usergrid.apache.org Date: Wed, 01 Oct 2014 20:01:04 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/19] git commit: Added tests for the log entry iterator Added tests for the log entry iterator Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/018dbeeb Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/018dbeeb Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/018dbeeb Branch: refs/heads/two-dot-o-rebuildable-index Commit: 018dbeeb8ad46bf536132120f78736fa2b0dd367 Parents: dc3f448 Author: Todd Nine Authored: Fri Sep 26 11:21:45 2014 -0600 Committer: Todd Nine Committed: Fri Sep 26 16:21:18 2014 -0600 ---------------------------------------------------------------------- .../impl/EntityVersionCleanupTask.java | 52 +++++-- .../serialization/impl/LogEntryIterator.java | 111 ++++++++++++++ .../serialization/impl/VersionIterator.java | 111 -------------- .../impl/EntityVersionCleanupTaskTest.java | 40 +++++ .../impl/LogEntryIteratorTest.java | 131 ++++++++++++++++ .../collection/util/LogEntryMock.java | 152 +++++++++++++++++++ .../core/task/NamedTaskExecutorImpl.java | 131 +++------------- .../usergrid/persistence/core/task/Task.java | 42 +++++ 8 files changed, 539 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java index 85afce6..29ca3ac 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java @@ -1,8 +1,11 @@ package org.apache.usergrid.persistence.collection.impl; +import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RecursiveTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,8 +14,10 @@ import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; -import org.apache.usergrid.persistence.collection.serialization.impl.VersionIterator; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator; import org.apache.usergrid.persistence.core.entity.EntityVersion; import org.apache.usergrid.persistence.core.task.Task; import org.apache.usergrid.persistence.model.entity.Id; @@ -22,7 +27,7 @@ import org.apache.usergrid.persistence.model.entity.Id; * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is * retained, the range is exclusive. */ -class EntityVersionCleanupTask extends Task, CollectionIoEvent> { +public class EntityVersionCleanupTask extends Task, CollectionIoEvent> { private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class ); @@ -33,15 +38,19 @@ class EntityVersionCleanupTask extends Task, Co private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; private final MvccEntitySerializationStrategy entitySerializationStrategy; + private final SerializationFig serializationFig; + private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final MvccEntitySerializationStrategy entitySerializationStrategy, final CollectionIoEvent collectionIoEvent, - final List listeners ) { + final List listeners, + final SerializationFig serializationFig ) { this.collectionIoEvent = collectionIoEvent; this.listeners = listeners; this.logEntrySerializationStrategy = logEntrySerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy; + this.serializationFig = serializationFig; } @@ -79,34 +88,51 @@ class EntityVersionCleanupTask extends Task, Co final UUID maxVersion = collectionIoEvent.getEvent().getVersion(); - VersionIterator versionIterator = - new VersionIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, 1000 ); - + LogEntryIterator logEntryIterator = + new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, serializationFig.getHistorySize() ); - UUID currentVersion = null; //for every entry, we want to clean it up with listeners - while ( versionIterator.hasNext() ) { + while ( logEntryIterator.hasNext() ) { + + final MvccLogEntry logEntry = logEntryIterator.next(); - currentVersion = versionIterator.next(); + + final UUID version = logEntry.getVersion(); + List> tasks = new ArrayList<>(); //execute all the listeners - for ( EntityVersionDeleted listener : listeners ) { - listener.versionDeleted( scope, entityId, currentVersion ); + for (final EntityVersionDeleted listener : listeners ) { + + tasks.add( new RecursiveTask() { + @Override + protected Void compute() { + listener.versionDeleted( scope, entityId, version ); + return null; + } + }.fork() ); + + } + //wait for them to complete + + joinAll(tasks); + //we do multiple invocations on purpose. Our log is our source of versions, only delete from it //after every successful invocation of listeners and entity removal - entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute(); + entitySerializationStrategy.delete( scope, entityId, version ).execute(); - logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute(); + logEntrySerializationStrategy.delete( scope, entityId, version ).execute(); } return collectionIoEvent; } + + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java new file mode 100644 index 0000000..53eb6e3 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java @@ -0,0 +1,111 @@ +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.UUID; + +import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + + +/** + * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion + * + */ +public class LogEntryIterator implements Iterator { + + + private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; + private final CollectionScope scope; + private final Id entityId; + private final int pageSize; + + + private Iterator elementItr; + private UUID nextStart; + + + /** + * + * @param logEntrySerializationStrategy The serialization strategy to get the log entries + * @param scope The scope of the entity + * @param entityId The id of the entity + * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version < max + * @param pageSize The fetch size to get when querying the serialization strategy + */ + public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + final CollectionScope scope, final Id entityId, final UUID maxVersion, + final int pageSize ) { + this.logEntrySerializationStrategy = logEntrySerializationStrategy; + this.scope = scope; + this.entityId = entityId; + this.nextStart = maxVersion; + this.pageSize = pageSize; + } + + + @Override + public boolean hasNext() { + if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) { + try { + advance(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to query cassandra", e ); + } + } + + return elementItr.hasNext(); + } + + + @Override + public MvccLogEntry next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No more elements exist" ); + } + + return elementItr.next(); + } + + + @Override + public void remove() { + throw new UnsupportedOperationException( "Remove is unsupported" ); + } + + + /** + * Advance our iterator + */ + public void advance() throws ConnectionException { + + final int requestedSize = pageSize + 1; + + //loop through even entry that's < this one and remove it + List results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize ); + + //we always remove the first version if it's equal since it's returned + if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) { + results.remove( 0 ); + } + + + //we have results, set our next start + if ( results.size() == pageSize ) { + nextStart = results.get( results.size() - 1 ).getVersion(); + } + //nothing left to do + else { + nextStart = null; + } + + elementItr = results.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java deleted file mode 100644 index 323f12d..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java +++ /dev/null @@ -1,111 +0,0 @@ -package org.apache.usergrid.persistence.collection.serialization.impl; - - -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.UUID; - -import org.apache.usergrid.persistence.collection.CollectionScope; -import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; - - -/** - * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion - * - */ -public class VersionIterator implements Iterator { - - - private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; - private final CollectionScope scope; - private final Id entityId; - private final int pageSize; - - - private Iterator elementItr; - private UUID nextStart; - - - /** - * - * @param logEntrySerializationStrategy The serialization strategy to get the log entries - * @param scope The scope of the entity - * @param entityId The id of the entity - * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version < max - * @param pageSize The fetch size to get when querying the serialization strategy - */ - public VersionIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - final CollectionScope scope, final Id entityId, final UUID maxVersion, - final int pageSize ) { - this.logEntrySerializationStrategy = logEntrySerializationStrategy; - this.scope = scope; - this.entityId = entityId; - this.nextStart = maxVersion; - this.pageSize = pageSize; - } - - - @Override - public boolean hasNext() { - if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) { - try { - advance(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to query cassandra", e ); - } - } - - return elementItr.hasNext(); - } - - - @Override - public UUID next() { - if ( !hasNext() ) { - throw new NoSuchElementException( "No more elements exist" ); - } - - return elementItr.next().getVersion(); - } - - - @Override - public void remove() { - throw new UnsupportedOperationException( "Remove is unsupported" ); - } - - - /** - * Advance our iterator - */ - public void advance() throws ConnectionException { - - final int requestedSize = pageSize + 1; - - //loop through even entry that's < this one and remove it - List results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize ); - - //we always remove the first version if it's equal since it's returned - if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) { - results.remove( 0 ); - } - - - //we have results, set our next start - if ( results.size() == requestedSize ) { - nextStart = results.get( results.size() - 1 ).getVersion(); - } - //nothing left to do - else { - nextStart = null; - } - - elementItr = results.iterator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java new file mode 100644 index 0000000..050ea9e --- /dev/null +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.impl; + + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.junit.Test; + +import static org.junit.Assert.*; + + +/** + * Cleanup task tests + */ +public class EntityVersionCleanupTaskTest { + + @Test + public void multiPageTask(){ + + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java new file mode 100644 index 0000000..9ee284b --- /dev/null +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java @@ -0,0 +1,131 @@ +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.UUID; + +import org.junit.Test; + +import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; +import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry; +import org.apache.usergrid.persistence.collection.util.LogEntryMock; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; + +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Tests iterator paging + */ +public class LogEntryIteratorTest { + + + @Test + public void empty() throws ConnectionException { + + final MvccLogEntrySerializationStrategy logEntrySerializationStrategy = + mock( MvccLogEntrySerializationStrategy.class ); + + final CollectionScope scope = + new CollectionScopeImpl( new SimpleId( "application" ), new SimpleId( "owner" ), "entities" ); + + final Id entityId = new SimpleId( "entity" ); + + final int pageSize = 100; + + + //set the start version, it should be discarded + UUID start = UUIDGenerator.newTimeUUID(); + + when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same( start ), same( pageSize ) ) ) + .thenReturn( new ArrayList() ); + + + //now iterate we should get everything + LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize ); + + + assertFalse( itr.hasNext() ); + } + + + @Test + public void partialLastPage() throws ConnectionException { + + + final int pageSize = 10; + final int totalPages = 3; + final int lastPageSize = pageSize / 2; + + //have one half page + + pageElements( pageSize, totalPages, lastPageSize ); + } + + + @Test + public void emptyLastPage() throws ConnectionException { + + + final int pageSize = 10; + final int totalPages = 3; + final int lastPageSize = 0; + + //have one half page + + pageElements( pageSize, totalPages, lastPageSize ); + } + + + public void pageElements( final int pageSize, final int totalPages, final int lastPageSize ) + throws ConnectionException { + + final MvccLogEntrySerializationStrategy logEntrySerializationStrategy = + mock( MvccLogEntrySerializationStrategy.class ); + + final CollectionScope scope = + new CollectionScopeImpl( new SimpleId( "application" ), new SimpleId( "owner" ), "entities" ); + + final Id entityId = new SimpleId( "entity" ); + + + //have one half page + final int toGenerate = pageSize * totalPages + lastPageSize; + + + final LogEntryMock mockResults = + LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, toGenerate ); + + Iterator expectedEntries = mockResults.getEntries().iterator(); + + //this element should be skipped + UUID start = expectedEntries.next().getVersion(); + + //now iterate we should get everything + LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize ); + + + while ( expectedEntries.hasNext() && itr.hasNext() ) { + final MvccLogEntry expected = expectedEntries.next(); + + final MvccLogEntry returned = itr.next(); + + assertEquals( expected, returned ); + } + + + assertFalse( itr.hasNext() ); + assertFalse( expectedEntries.hasNext() ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java new file mode 100644 index 0000000..a25bc94 --- /dev/null +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java @@ -0,0 +1,152 @@ +package org.apache.usergrid.persistence.collection.util;/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.TreeMap; +import java.util.UUID; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry; +import org.apache.usergrid.persistence.collection.mvcc.entity.Stage; +import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.common.base.Preconditions; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.when; + + +/** + * Utility for constructing representative log entries for mock serialziation from high to low + */ +public class LogEntryMock { + + + private final TreeMap entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE); + + private final Id entityId; + + + /** + * Create a mock list of versions of the specified size + * + * @param entityId The entity Id to use + * @param size The size to use + */ + private LogEntryMock(final Id entityId, final int size ) { + + this.entityId = entityId; + + for ( int i = 0; i < size; i++ ) { + + final UUID version = UUIDGenerator.newTimeUUID(); + + entries.put( version, + new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ) ); + } + } + + + /** + * Init the mock with the given data structure + * @param logEntrySerializationStrategy The strategy to moc + * @param scope + * @throws ConnectionException + */ + private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final CollectionScope scope ) + + throws ConnectionException { + + //wire up the mocks + when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class), any(Integer.class) )).thenAnswer( new Answer>() { + + + @Override + public List answer( final InvocationOnMock invocation ) throws Throwable { + final UUID startVersion = ( UUID ) invocation.getArguments()[2]; + final int count = (Integer)invocation.getArguments()[3]; + + final List results = new ArrayList<>( count ); + + final Iterator itr = entries.tailMap( startVersion, true ).values().iterator(); + + for(int i = 0; i < count && itr.hasNext(); i ++){ + results.add( itr.next() ); + } + + + return results; + } + } ); + } + + + /** + * Get the entries (ordered from high to low) this mock contains + * @return + */ + public Collection getEntries(){ + return entries.values(); + } + + /** + * + * @param logEntrySerializationStrategy The mock to use + * @param scope The scope to use + * @param entityId The entityId to use + * @param size The number of entries to mock + * @throws ConnectionException + */ + public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final CollectionScope scope,final Id entityId, final int size ) + + throws ConnectionException { + LogEntryMock mock = new LogEntryMock( entityId, size ); + mock.initMock( logEntrySerializationStrategy, scope ); + + return mock; + } + + + private static final class ReversedUUIDComparator implements Comparator { + + public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator(); + + + @Override + public int compare( final UUID o1, final UUID o2 ) { + return UUIDComparator.staticCompare( o1, o2 ) * -1; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java index aba8cd5..4fc72c8 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.usergrid.persistence.core.task; @@ -36,7 +54,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor { * @param name The name of this instance of the task executor * @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once. */ - public NamedTaskExecutorImpl( final String name, final int poolSize) { + public NamedTaskExecutorImpl( final String name, final int poolSize ) { Preconditions.checkNotNull( name ); Preconditions.checkArgument( name.length() > 0, "name must have a length" ); Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" ); @@ -44,12 +62,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor { this.name = name; this.poolSize = poolSize; -// final BlockingQueue queue = -// queueLength > 0 ? new ArrayBlockingQueue( queueLength ) : new SynchronousQueue(); -// -// executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) ); - - this.executorService = new NamedForkJoinPool(poolSize); + this.executorService = new NamedForkJoinPool( poolSize ); } @@ -57,126 +70,30 @@ public class NamedTaskExecutorImpl implements TaskExecutor { public Task submit( final Task task ) { try { - executorService.submit( task ); + executorService.submit( task ); } catch ( RejectedExecutionException ree ) { task.rejected(); } return task; - } - /** - * Callback for when the task succeeds or fails. - */ - private static final class TaskFutureCallBack implements FutureCallback { - - private final Task task; - - - private TaskFutureCallBack( Task task ) { - this.task = task; - } - - - @Override - public void onSuccess( @Nullable final V result ) { - LOG.debug( "Successfully completed task ", task ); - } - - - @Override - public void onFailure( final Throwable t ) { - LOG.error( "Unable to execute task. Exception is ", t ); - - task.exceptionThrown( t ); - } - } - - - private final class NamedForkJoinPool extends ForkJoinPool{ + private final class NamedForkJoinPool extends ForkJoinPool { private NamedForkJoinPool( final int workerThreadCount ) { //TODO, verify the scheduler at the end super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true ); } - - - } - private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler{ + + private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException( final Thread t, final Throwable e ) { LOG.error( "Uncaught exception on thread {} was {}", t, e ); } } - - - - - private final class NamedWorkerThread extends ForkJoinWorkerThread{ - - /** - * Creates a ForkJoinWorkerThread operating in the given pool. - * - * @param pool the pool this thread works in - * - * @throws NullPointerException if pool is null - */ - protected NamedWorkerThread(final String name, final ForkJoinPool pool ) { - super( pool ); - } - } - /** - * Create a thread pool that will reject work if our audit tasks become overwhelmed - */ - private final class MaxSizeThreadPool extends ThreadPoolExecutor { - - public MaxSizeThreadPool( BlockingQueue queue ) { - - super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ), - new RejectedHandler() ); - } - } - - - /** - * Thread factory that will name and count threads for easier debugging - */ - private final class CountingThreadFactory implements ThreadFactory { - - private final AtomicLong threadCounter = new AtomicLong(); - - - @Override - public Thread newThread( final Runnable r ) { - final long newValue = threadCounter.incrementAndGet(); - - Thread t = new Thread( r, name + "-" + newValue ); - - t.setDaemon( true ); - - return t; - } - } - - - /** - * The handler that will handle rejected executions and signal the interface - */ - private final class RejectedHandler implements RejectedExecutionHandler { - - - @Override - public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) { - LOG.warn( "{} task queue full, rejecting task {}", name, r ); - - throw new RejectedExecutionException( "Unable to run task, queue full" ); - } - - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java index eb04c2c..4dccc72 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java @@ -1,6 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.usergrid.persistence.core.task; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; @@ -28,6 +50,26 @@ public abstract class Task extends RecursiveTask { /** + * Fork all tasks in the list except the last one. The last will be run in the caller thread + * The others will wait for join + * @param tasks + * + */ + public > List joinAll(List tasks) throws ExecutionException, InterruptedException { + + //don't fork the last one + List results = new ArrayList<>(tasks.size()); + + for(T task: tasks){ + results.add( task.join() ); + } + + return results; + + } + + + /** * Execute the task */ public abstract V executeTask() throws Exception;