usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [1/2] incubator-usergrid git commit: Migrations work. Need to finish tests.
Date Tue, 03 Mar 2015 01:43:15 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-405 e96af464b -> fa69be86c


Migrations work.  Need to finish tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a55c784d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a55c784d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a55c784d

Branch: refs/heads/USERGRID-405
Commit: a55c784dac5beb624950f6b09746667e4431fbd8
Parents: e96af46
Author: Todd Nine <tnine@apigee.com>
Authored: Mon Mar 2 16:11:11 2015 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Mon Mar 2 17:19:27 2015 -0700

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |  12 +-
 .../impl/CollectionDataVersions.java            |   4 +-
 .../MvccEntitySerializationStrategyV1Impl.java  |   2 +-
 .../MvccEntitySerializationStrategyV2Impl.java  |   2 +-
 .../MvccEntitySerializationStrategyV3Impl.java  |   2 +-
 .../serialization/impl/SerializationModule.java |  78 ++--
 .../migration/CollectionMigrationPlugin.java    |  78 +++-
 .../migration/MvccEntityDataMigrationImpl.java  | 446 ++++++++++++++-----
 .../collection/guice/TestCollectionModule.java  |  17 +-
 .../persistence/core/guice/CommonModule.java    |   2 +
 .../data/MigrationInfoSerializationImpl.java    |   6 +-
 .../migration/data/newimpls/DataMigration2.java |  17 +-
 .../data/newimpls/MigrationPlugin.java          |   6 +-
 .../newimpls/TestMigrationDataProvider.java     |  61 +++
 14 files changed, 563 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index af5c4cf..7d78177 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -55,7 +55,7 @@ import com.google.inject.multibindings.Multibinder;
  *
  * @author tnine
  */
-public class CollectionModule extends AbstractModule {
+public abstract class CollectionModule extends AbstractModule {
 
 
     @Override
@@ -86,6 +86,8 @@ public class CollectionModule extends AbstractModule {
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
         bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class);
 
+        configureMigrationProvider();
+
     }
 
     @Provides
@@ -120,6 +122,14 @@ public class CollectionModule extends AbstractModule {
     }
 
 
+    /**
+     * Gives callers the ability to to configure an instance of
+     *
+     * MigrationDataProvider<EntityIdScope> for providing data migrations
+     */
+    public abstract void configureMigrationProvider();
+
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
index ee84b1e..6e8be45 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/CollectionDataVersions.java
@@ -28,9 +28,9 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
  * Versions of data as they exist across our system
  */
 public enum CollectionDataVersions{
+    ZERO(0),
     ONE(1),
-    TWO(2),
-    THREE(3);
+    TWO(2);
 
     private final int version;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
index a959508..1dab673 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
@@ -90,7 +90,7 @@ public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializati
 
     @Override
     public int getImplementationVersion() {
-        return CollectionDataVersions.ONE.getVersion();
+        return CollectionDataVersions.ZERO.getVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
index 04a1fea..1f65fcb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
@@ -93,7 +93,7 @@ public class MvccEntitySerializationStrategyV2Impl extends MvccEntitySerializati
 
     @Override
     public int getImplementationVersion() {
-        return CollectionDataVersions.TWO.getVersion();
+        return CollectionDataVersions.ONE.getVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index 4e73119..7b8aac1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -383,7 +383,7 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ
 
     @Override
     public int getImplementationVersion() {
-        return CollectionDataVersions.THREE.getVersion();
+        return CollectionDataVersions.TWO.getVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index fe804f3..f32f4f9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -34,7 +34,7 @@ import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Key;
-import com.google.inject.Provides;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
@@ -54,26 +54,32 @@ public class SerializationModule extends AbstractModule {
         //We've migrated this one, so we need to set up the previous, current, and proxy
 
 
-        bind(MvccEntitySerializationStrategy.class).annotatedWith( ProxyImpl.class ).to( MvccEntitySerializationStrategyProxyImpl.class );
-
-
+        bind( MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
+                                                     .to( MvccEntitySerializationStrategyProxyImpl.class );
 
 
         //bind all 3 implementations
-        bind(MvccEntitySerializationStrategyV1Impl.class);
-        bind(MvccEntitySerializationStrategyV2Impl.class);
-        bind(MvccEntitySerializationStrategyV3Impl.class);
+        bind( MvccEntitySerializationStrategyV1Impl.class );
+        bind( MvccEntitySerializationStrategyV2Impl.class );
+        bind( MvccEntitySerializationStrategyV3Impl.class );
+
 
+        bind( new TypeLiteral<VersionedMigrationSet<MvccEntitySerializationStrategy>>() {} )
+                .toProvider( MvccEntitySerializationStrategyProvider.class );
 
 
         //migrations
         //we want to make sure our generics are retained, so we use a typeliteral
-        Multibinder<DataMigration2<EntityIdScope>> dataMigrationMultibinder =  Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<EntityIdScope>>(){});
+        Multibinder<DataMigration2<EntityIdScope>> dataMigrationMultibinder =
+                Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<EntityIdScope>>() {} );
+
+
         dataMigrationMultibinder.addBinding().to( MvccEntityDataMigrationImpl.class );
 
 
         //wire up the collection migration plugin
-        Multibinder.newSetBinder( binder(), MigrationPlugin.class).addBinding().to( CollectionMigrationPlugin.class );
+        Multibinder.newSetBinder( binder(), MigrationPlugin.class ).addBinding().to( CollectionMigrationPlugin.class );
+
 
 
 
@@ -90,39 +96,53 @@ public class SerializationModule extends AbstractModule {
 
 
         //bind our settings as an eager singleton so it's checked on startup
-        bind(SettingsValidation.class).asEagerSingleton();
+        bind( SettingsValidation.class ).asEagerSingleton();
     }
 
 
-    /**
-     * Configure via explicit declaration the migration path we can follow
-     * @param v1
-     * @param v2
-     * @param v3
-     * @return
-     */
     @Singleton
-    @Inject
-    @Provides
-    public VersionedMigrationSet<MvccEntitySerializationStrategy> getVersions(final MvccEntitySerializationStrategyV1Impl v1, final MvccEntitySerializationStrategyV2Impl v2, final MvccEntitySerializationStrategyV3Impl v3){
+    public static final class MvccEntitySerializationStrategyProvider
+            implements Provider<VersionedMigrationSet<MvccEntitySerializationStrategy>> {
+
+
+        private final MvccEntitySerializationStrategyV1Impl v1;
+        private final MvccEntitySerializationStrategyV2Impl v2;
+        private final MvccEntitySerializationStrategyV3Impl v3;
+
+
+        @Inject
+        public MvccEntitySerializationStrategyProvider( final MvccEntitySerializationStrategyV1Impl v1,
+                                                         final MvccEntitySerializationStrategyV2Impl v2,
+                                                         final MvccEntitySerializationStrategyV3Impl v3 ) {
+            this.v1 = v1;
+            this.v2 = v2;
+            this.v3 = v3;
+        }
+
 
+        @Override
+        public VersionedMigrationSet<MvccEntitySerializationStrategy> get() {
 
-        //we must perform a migration from v1 to v3 in order to maintain consistency
-        MigrationRelationship<MvccEntitySerializationStrategy> v1Tov3 = new MigrationRelationship<>( v1, v3 );
+            //we must perform a migration from v1 to v3 in order to maintain consistency
+            MigrationRelationship<MvccEntitySerializationStrategy> v1Tov3 = new MigrationRelationship<>( v1, v3 );
 
-        //we must migrate from 2 to 3, this is a bridge that must happen to maintain data consistency
+            //we must migrate from 2 to 3, this is a bridge that must happen to maintain data consistency
 
-        MigrationRelationship<MvccEntitySerializationStrategy> v2Tov3 = new MigrationRelationship<>( v2, v3 );
+            MigrationRelationship<MvccEntitySerializationStrategy> v2Tov3 = new MigrationRelationship<>( v2, v3 );
 
 
-        //note that we MUST migrate to v3 before our next migration, if v4 and v5 is implemented we will need a v3->v5 and a v4->v5 set
-        MigrationRelationship<MvccEntitySerializationStrategy> current = new MigrationRelationship<MvccEntitySerializationStrategy>( v3, v3 );
+            //note that we MUST migrate to v3 before our next migration, if v4 and v5 is implemented we will need a
+            // v3->v5 and a v4->v5 set
+            MigrationRelationship<MvccEntitySerializationStrategy> current =
+                    new MigrationRelationship<MvccEntitySerializationStrategy>( v3, v3 );
 
 
-        //now create our set of versions
-        VersionedMigrationSet<MvccEntitySerializationStrategy> set = new VersionedMigrationSet<>( v1Tov3, v2Tov3, current );
+            //now create our set of versions
+            VersionedMigrationSet<MvccEntitySerializationStrategy> set =
+                    new VersionedMigrationSet<>( v1Tov3, v2Tov3, current );
 
-        return set;
 
+            return set;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
index 5d52a9f..d0663c2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
@@ -26,6 +26,11 @@ package org.apache.usergrid.persistence.collection.serialization.impl.migration;
 
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
@@ -42,17 +47,22 @@ import com.google.inject.Singleton;
 public class CollectionMigrationPlugin implements MigrationPlugin {
 
 
-    public static final String PLUGIN_NAME =  "collections-entity-data";
+    private static final Logger LOG = LoggerFactory.getLogger( CollectionMigrationPlugin.class );
+
+    public static final String PLUGIN_NAME = "collections-entity-data";
 
-    private final DataMigration2<EntityIdScope> entityDataMigration;
+    private final Set<DataMigration2<EntityIdScope>> entityDataMigrations;
     private final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider;
+    private final MigrationInfoSerialization migrationInfoSerialization;
 
 
     @Inject
-    public CollectionMigrationPlugin( final DataMigration2<EntityIdScope> entityDataMigration,
-                                      final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider ) {
-        this.entityDataMigration = entityDataMigration;
+    public CollectionMigrationPlugin( final Set<DataMigration2<EntityIdScope>> entityDataMigrations,
+                                      final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider,
+                                      final MigrationInfoSerialization migrationInfoSerialization ) {
+        this.entityDataMigrations = entityDataMigrations;
         this.entityIdScopeDataMigrationProvider = entityIdScopeDataMigrationProvider;
+        this.migrationInfoSerialization = migrationInfoSerialization;
     }
 
 
@@ -64,12 +74,66 @@ public class CollectionMigrationPlugin implements MigrationPlugin {
 
     @Override
     public void run( final ProgressObserver observer ) {
-       entityDataMigration.migrate( entityIdScopeDataMigrationProvider, observer );
+
+        //run until complete
+        while(runMigration( observer )){
+         LOG.info( "Migration complete, checking for next run" );
+        }
+
     }
 
 
     @Override
     public int getMaxVersion() {
-        return 0;
+
+        int max = 0;
+
+        for(DataMigration2<EntityIdScope> entityMigration: entityDataMigrations){
+            max = Math.max( max, entityMigration.getMaxVersion() );
+        }
+
+        return max;
+    }
+
+
+    /**
+     * Try to run the migration
+     *
+     * @return True if we ran a migration
+     */
+    private boolean runMigration( final ProgressObserver po ) {
+        DataMigration2<EntityIdScope> migrationToExecute = null;
+
+
+        final int version = migrationInfoSerialization.getVersion( PLUGIN_NAME );
+
+        for ( DataMigration2<EntityIdScope> entityMigration : entityDataMigrations ) {
+            if ( entityMigration.supports( version ) ) {
+                if ( migrationToExecute != null ) {
+                    throw new DataMigrationException(
+                            "Two migrations attempted to migration the same version, this is not allowed.  Class '"
+                                    + migrationToExecute.getClass().getName() + "' and class '" + entityMigration
+                                    .getClass().getName()
+                                    + "' both support this version. This means something is wired incorrectly" );
+                }
+
+                migrationToExecute = entityMigration;
+            }
+        }
+
+        if(migrationToExecute == null){
+            LOG.info( "No migrations found to execute" );
+            return false;
+        }
+
+        //run the migration
+        final int newSystemVersion = migrationToExecute.migrate( version, entityIdScopeDataMigrationProvider, po );
+
+        migrationInfoSerialization.setVersion( PLUGIN_NAME, newSystemVersion );
+
+        //signal we've run a migration and return
+        return true;
+
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 5250b5c..48e0195 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -26,10 +26,14 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
@@ -45,51 +49,68 @@ import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Subscriber;
+import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.functions.Func2;
+import rx.observables.GroupedObservable;
 import rx.schedulers.Schedulers;
 
 
 /**
  * Data migration strategy for entities
  */
+@Singleton
 public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope> {
 
     private final Keyspace keyspace;
     private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MigrationInfoSerialization migrationInfoSerialization;
+    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+    private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
 
 
     @Inject
     public MvccEntityDataMigrationImpl( final Keyspace keyspace,
                                         final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
-                                        final MigrationInfoSerialization migrationInfoSerialization ) {
+                                        final MigrationInfoSerialization migrationInfoSerialization,
+                                        final EntityVersionCleanupFactory entityVersionCleanupFactory,
+                                        final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3 ) {
 
         this.keyspace = keyspace;
         this.allVersions = allVersions;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.migrationInfoSerialization = migrationInfoSerialization;
+        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+        this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
     }
 
 
     @Override
-    public int getVersion() {
-        //get the max implementation version, since that's what we're going to
-        return allVersions.getMaxVersion( migrationInfoSerialization.getVersion( CollectionMigrationPlugin.PLUGIN_NAME ) );
+    public boolean supports( final int currentVersion ) {
+        //we can only migrate up to v3 with this implementation.  Beyond that, we should use a different migration
+        return currentVersion < mvccEntitySerializationStrategyV3.getImplementationVersion();
     }
 
 
     @Override
-    public void migrate( final MigrationDataProvider<EntityIdScope> migrationDataProvider,   final ProgressObserver observer ) {
+    public int getMaxVersion() {
+        return mvccEntitySerializationStrategyV3.getImplementationVersion();
+    }
+
+
+    @Override
+    public int migrate( final int currentVersion, final MigrationDataProvider<EntityIdScope> migrationDataProvider,
+                         final ProgressObserver observer ) {
 
         final AtomicLong atomicLong = new AtomicLong();
 
@@ -97,137 +118,326 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
 
         final UUID startTime = UUIDGenerator.newTimeUUID();
 
-        final MigrationRelationship<MvccEntitySerializationStrategy>
-                migration = allVersions.getMigrationRelationship( getCurrentSystemVersion() );
-
-        final long migrated = migrationDataProvider.getData().subscribeOn( Schedulers.io() )
-                   .parallel( new Func1<Observable<EntityIdScope>, Observable<Long>>() {
-
-
-                       //process the ids in parallel
-                       @Override
-                       public Observable<Long> call( final Observable<EntityIdScope> entityIdScopeObservable ) {
-
-
-                           return entityIdScopeObservable
-                                   .flatMap( new Func1<EntityIdScope, Observable<EntityToSaveMessage>>() {
-
-
-                                       @Override
-                                       public Observable<EntityToSaveMessage> call(
-                                               final EntityIdScope entityIdScope ) {
-
-                                           //load the entity
-                                           final CollectionScope currentScope = entityIdScope.getCollectionScope();
-
-
-
-                                           //for each element in our history, we need to copy it to v2.  Note that this migration
-                                           //won't support anything beyond V2
-
-                                           final Iterator<MvccEntity> allVersions = migration.from
-                                                   .loadAscendingHistory( currentScope, entityIdScope.getId(),
-                                                           startTime, 100 );
-
-                                           //emit all the entities
-                                           return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() {
-                                               @Override
-                                               public void call(
-                                                       final Subscriber<? super EntityToSaveMessage> subscriber ) {
-
-                                                   while ( allVersions.hasNext() ) {
-                                                       final EntityToSaveMessage message =
-                                                               new EntityToSaveMessage( currentScope,
-                                                                       allVersions.next() );
-                                                       subscriber.onNext( message );
-                                                   }
-
-                                                   subscriber.onCompleted();
-                                               }
-                                           } );
-                                       }
-                                   } ).buffer( 100 ).doOnNext( new Action1<List<EntityToSaveMessage>>() {
-                                       @Override
-                                       public void call( final List<EntityToSaveMessage> messages ) {
-                                           atomicLong.addAndGet( messages.size() );
-
-                                           final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+        final MigrationRelationship<MvccEntitySerializationStrategy> migration =
+                allVersions.getMigrationRelationship( currentVersion );
 
 
-                                           for ( EntityToSaveMessage message : messages ) {
-
-                                               final MutationBatch entityRewrite =
-                                                       migration.to.write( message.scope, message.entity );
-
-                                               //add to the total batch
-                                               totalBatch.mergeShallow( entityRewrite );
-
-                                               //write the unique values
-
-                                               if ( message.entity.getEntity().isPresent() ) {
-
-                                                   final Entity entity = message.entity.getEntity().get();
-
-                                                   final Id entityId = entity.getId();
-
-                                                   final UUID version = message.entity.getVersion();
-
-                                                   // re-write the unique values but this time with no TTL
-                                                   for ( Field field : EntityUtils
-                                                           .getUniqueFields( message.entity.getEntity().get() ) ) {
-
-                                                       UniqueValue written =
-                                                               new UniqueValueImpl( field, entityId, version );
-
-                                                       MutationBatch mb =
-                                                               uniqueValueSerializationStrategy.write( message.scope, written );
-
-
-                                                       // merge into our existing mutation batch
-                                                       totalBatch.mergeShallow( mb );
-                                                   }
-
-
-                                               }
-                                           }
-
-
-                                           executeBatch( totalBatch, observer, atomicLong );
-                                       }
-                                   } )
-                                           //count the results
-                                   .reduce( 0l, new Func2<Long, List<EntityToSaveMessage>, Long>() {
-                                       @Override
-                                       public Long call( final Long aLong, final List<EntityToSaveMessage> ids ) {
-                                           return aLong + ids.size();
-                                       }
-                                   } );
-                       }
-                   } ).toBlocking().last();
+        final long migrated = migrationDataProvider.getData().subscribeOn( Schedulers.io() )
+                                                   .parallel( new Func1<Observable<EntityIdScope>, Observable<Long>>() {
+
+
+                  //process the ids in parallel
+                  @Override
+                  public Observable<Long> call(
+                          final Observable<EntityIdScope>
+                                  entityIdScopeObservable ) {
+
+
+                      return entityIdScopeObservable.flatMap(
+                              new Func1<EntityIdScope,
+                                      Observable<EntityToSaveMessage>>() {
+
+
+                                  @Override
+                                  public
+                                  Observable<EntityToSaveMessage> call(
+                                          final EntityIdScope
+                                                  entityIdScope ) {
+
+                                      //load the entity
+                                      final CollectionScope
+                                              currentScope =
+                                              entityIdScope
+                                                      .getCollectionScope();
+
+
+                                      //for each element in our
+                                      // history, we need to copy it
+                                      // to v2.
+                                      // Note that
+                                      // this migration
+                                      //won't support anything beyond V2
+
+                                      final Iterator<MvccEntity>
+                                              allVersions =
+                                              migration.from
+                                                      .loadAscendingHistory(
+                                                              currentScope,
+                                                              entityIdScope
+                                                                      .getId(),
+                                                              startTime,
+                                                              100 );
+
+                                      //emit all the entity versions
+                                      return Observable.create(
+                                              new Observable
+                                                      .OnSubscribe<EntityToSaveMessage>() {
+                                                  @Override
+                                                  public void call(
+                                                          final
+                                                          Subscriber<? super
+                                                                  EntityToSaveMessage> subscriber ) {
+
+                                                      while ( allVersions
+                                                              .hasNext() ) {
+                                                          final
+                                                          EntityToSaveMessage
+                                                                  message =
+                                                                  new EntityToSaveMessage(
+                                                                          currentScope,
+                                                                          allVersions
+                                                                                  .next() );
+                                                          subscriber.onNext( message );
+                                                      }
+
+                                                      subscriber.onCompleted();
+                                                  }
+                                              } );
+                                  }
+                              } )
+
+
+                              //group them by entity id so we can get
+                              // the max for cleanup
+                              .groupBy(
+                                      new Func1<EntityToSaveMessage,
+                                              Id>() {
+                                          @Override
+                                          public Id call(
+                                                  final
+                                                  EntityToSaveMessage
+                                                          entityToSaveMessage ) {
+                                              return entityToSaveMessage.entity
+                                                      .getId();
+                                          }
+                                      } )
+                              //buffer up 10 of groups so we can put them all in a single mutation
+                              .buffer( 10 ).doOnNext(
+                                      new Action1<List<GroupedObservable<Id, EntityToSaveMessage>>>() {
+
+
+                                          @Override
+                                          public void call(
+                                                  final
+                                                  List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
+
+                                              atomicLong.addAndGet(
+                                                      groupedObservables
+                                                              .size() );
+
+                                              final MutationBatch
+                                                      totalBatch =
+                                                      keyspace.prepareMutationBatch();
+
+
+                                              //run each of the
+                                              // groups and add
+                                              // it ot the batch
+                                              Observable
+                                                      .from( groupedObservables )
+                                                      //emit the group as an observable
+                                                      .flatMap(
+                                                              new Func1<GroupedObservable<Id, EntityToSaveMessage>, Observable<EntityToSaveMessage>>() {
+
+
+                                                                  @Override
+                                                                  public Observable<EntityToSaveMessage> call(
+                                                                          final GroupedObservable<Id, EntityToSaveMessage> idEntityToSaveMessageGroupedObservable ) {
+                                                                      return idEntityToSaveMessageGroupedObservable
+                                                                              .asObservable();
+                                                                  }
+                                                              } )
+
+                                                      //merge and add the batch
+                                                      .doOnNext(
+                                                              new Action1<EntityToSaveMessage>() {
+                                                                  @Override
+                                                                  public void call(
+                                                                          final EntityToSaveMessage message ) {
+
+                                                                      final MutationBatch
+                                                                              entityRewrite =
+                                                                              migration.to.write( message.scope,
+                                                                                              message.entity );
+
+                                                                      //add to
+                                                                      // the
+                                                                      // total
+                                                                      // batch
+                                                                      totalBatch.mergeShallow( entityRewrite );
+
+                                                                      //write
+                                                                      // the
+                                                                      // unique values
+
+                                                                      if ( !message.entity
+                                                                              .getEntity()
+                                                                              .isPresent() ) {
+                                                                          return;
+                                                                      }
+
+                                                                          final Entity
+                                                                                  entity =
+                                                                                  message.entity
+                                                                                          .getEntity()
+                                                                                          .get();
+
+                                                                          final Id
+                                                                                  entityId =
+                                                                                  entity.getId();
+
+                                                                          final UUID
+                                                                                  version =
+                                                                                  message.entity
+                                                                                          .getVersion();
+
+                                                                          // re-write the unique values
+                                                                          // but this
+                                                                          // time with
+                                                                          // no TTL so that cleanup can clean up older values
+                                                                          for ( Field field : EntityUtils
+                                                                                  .getUniqueFields(
+                                                                                          message.entity
+                                                                                                  .getEntity()
+                                                                                                  .get() ) ) {
+
+                                                                              UniqueValue
+                                                                                      written =
+                                                                                      new UniqueValueImpl(
+                                                                                              field,
+                                                                                              entityId,
+                                                                                              version );
+
+                                                                              MutationBatch
+                                                                                      mb =
+                                                                                      uniqueValueSerializationStrategy
+                                                                                              .write( message.scope,
+                                                                                                      written );
+
+
+                                                                              // merge into our
+                                                                              // existing mutation
+                                                                              // batch
+                                                                              totalBatch
+                                                                                      .mergeShallow(
+                                                                                              mb );
+                                                                          }
+                                                                  }
+                                                              } )
+                                                      //once we've streamed everything, flush it
+                                                      .doOnCompleted(
+
+                                                              new Action0() {
+                                                                  @Override
+                                                                  public void call() {
+
+                                                                      executeBatch(migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
+                                                                  }
+                                                              } )
+                                                      .toBlocking()
+                                                      .last();
+                                          }
+                                      } ).doOnNext(
+                                      new Action1<List<GroupedObservable<Id, EntityToSaveMessage>>>() {
+                                          @Override
+                                          public void call(
+                                                  final List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
+
+                                              for ( final GroupedObservable<Id, EntityToSaveMessage> group : groupedObservables ) {
+
+                                                  //get the highest
+                                                  // entity and run a
+                                                  // cleanup task on it
+                                                  final EntityToSaveMessage
+                                                          maxEntity =
+                                                          group.toBlocking()
+                                                               .last();
+
+                                                  final EntityVersionCleanupTask
+                                                          task =
+                                                          entityVersionCleanupFactory
+                                                                  .getTask(
+                                                                          maxEntity.scope,
+                                                                          maxEntity.entity
+                                                                                  .getId(),
+                                                                          maxEntity.entity
+                                                                                  .getVersion() );
+
+                                                  /**
+                                                   * just run the
+                                                   * call in this
+                                                   * process, we're
+                                                   * already
+                                                   * doing parallel
+                                                   * this forces a
+                                                   * repair of the
+                                                   * unique properties,
+                                                   and will bring us
+                                                   to a consistent
+                                                   state after the
+                                                   */
+
+                                                  try {
+                                                      task.call();
+                                                  }
+                                                  catch ( Exception e ) {
+                                                      throw new RuntimeException(
+                                                              "Unable to run cleanup task",
+                                                              e );
+                                                  }
+                                              }
+                                          }
+                                      } ).
+                                      reduce( 0l,
+                                              new Func2<Long, List<GroupedObservable<Id, EntityToSaveMessage>>, Long>() {
+
+                                                  @Override
+                                                  public Long call(
+                                                          final Long aLong,
+                                                          final List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
+
+                                                      long newCount =
+                                                              aLong;
+
+                                                      for ( GroupedObservable<Id, EntityToSaveMessage> group : groupedObservables ) {
+                                                          newCount +=
+                                                                  group.longCount()
+                                                                       .toBlocking()
+                                                                       .last();
+                                                      }
+
+                                                      return newCount;
+                                                  }
+                                              }
+
+
+                                            );
+                  }}).toBlocking().last();
 
         //now we update the progress observer
 
-        observer.update( getVersion(), "Finished for this step.  Migrated " + migrated + "entities total. ");
+        observer.update( migration.to.getImplementationVersion(), "Finished for this step.  Migrated " + migrated + "entities total. " );
+
+        return migration.to.getImplementationVersion();
     }
 
 
-    protected void executeBatch( final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
+    protected void executeBatch( final int targetVersion, final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
         try {
             batch.execute();
 
-            po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+            po.update(targetVersion, "Finished copying " + count + " entities to the new format" );
         }
         catch ( ConnectionException e ) {
-            po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+            po.failed( targetVersion, "Failed to execute mutation in cassandra" );
             throw new DataMigrationException( "Unable to migrate batches ", e );
         }
     }
 
-    private int getCurrentSystemVersion(){
-       return migrationInfoSerialization.getVersion( CollectionMigrationPlugin.PLUGIN_NAME );
-    }
 
-    private static final class EntityToSaveMessage{
+
+
+    private static final class EntityToSaveMessage {
         private final CollectionScope scope;
         private final MvccEntity entity;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
index 7e1f32b..b42ad94 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
@@ -20,8 +20,15 @@
 package org.apache.usergrid.persistence.collection.guice;
 
 
+import java.util.Collections;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.TestMigrationDataProvider;
+
+import com.google.inject.TypeLiteral;
 
 
 public class TestCollectionModule extends TestModule {
@@ -30,7 +37,15 @@ public class TestCollectionModule extends TestModule {
     protected void configure() {
 
         install( new CommonModule() );
-        install( new CollectionModule() );
+        install( new CollectionModule() {
+            @Override
+            public void configureMigrationProvider() {
+                //configure our migration data provider
+
+                TestMigrationDataProvider<EntityIdScope> migrationDataProvider = new TestMigrationDataProvider<>();
+                bind(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){}).toInstance( migrationDataProvider );
+            }
+        } );
 
         /**
          * Test modules

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index 61b6b04..97c0479 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -76,6 +76,8 @@ public class CommonModule extends AbstractModule {
         bind( DataMigrationManager.class ).to( DataMigrationManagerImpl.class );
 
 
+        bind (MigrationInfoCache.class).to( MigrationInfoCacheImpl.class );
+
 
         //do multibindings for migrations
         //create the empty multibinder so other plugins can use it

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
index f976568..4a349fd 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
@@ -146,7 +146,7 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
         }
         //swallow, it doesn't exist
         catch ( NotFoundException nfe ) {
-            return -1;
+            return 0;
         }
         catch ( ConnectionException e ) {
             throw new DataMigrationException( "Unable to retrieve status", e );
@@ -180,7 +180,7 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
         }
         //swallow, it doesn't exist
         catch ( NotFoundException nfe ) {
-            return -1;
+            return 0;
         }
         catch ( ConnectionException e ) {
             throw new DataMigrationException( "Unable to retrieve status", e );
@@ -196,7 +196,7 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
         }
         //swallow, it doesn't exist
         catch ( NotFoundException nfe ) {
-            return -1;
+            return 0;
         }
         catch ( ConnectionException e ) {
             throw new DataMigrationException( "Unable to retrieve status", e );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
index 5830335..303c11c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
@@ -33,15 +33,26 @@ public interface DataMigration2<T> {
 
     /**
      * Perform the migration, returning an observable with a single emitted value
+     * @param currentVersion the current version of the system
      * @param migrationDataProvider
+     * @param observer The observer to receive updates of the progress
+     *
+     * @return The version that the system is now running
      */
-    public void migrate(MigrationDataProvider<T> migrationDataProvider, ProgressObserver observer);
+    public int migrate(final int currentVersion, MigrationDataProvider<T> migrationDataProvider, ProgressObserver observer);
 
     /**
-     * Get the version of this migration. It should be unique within the scope of the plugin
+     * Check if this version supports migration from the current system version.  If this returns false,
+     * migrate will not be invoked
      * @return
      */
-    public int getVersion();
+    public boolean supports(final int currentVersion);
+
+    /**
+     * Get the max version this migration can migrate to
+     * @return
+     */
+    public int getMaxVersion();
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
index 880cfd1..50dc91b 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
@@ -42,10 +42,10 @@ public interface MigrationPlugin {
      */
     public void run(ProgressObserver observer);
 
+
     /**
-     * Get the maximum migration version this plugin implements
+     * Get the max version this plugin supports
      * @return
      */
-    public int getMaxVersion();
-
+    int getMaxVersion();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a55c784d/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/newimpls/TestMigrationDataProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/newimpls/TestMigrationDataProvider.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/newimpls/TestMigrationDataProvider.java
new file mode 100644
index 0000000..9e99c6f
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/newimpls/TestMigrationDataProvider.java
@@ -0,0 +1,61 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+import java.util.Collection;
+
+import rx.Observable;
+
+
+/**
+ * A simple test class that will emit the provided test data when subscribed
+ * @param <T>
+ */
+public class TestMigrationDataProvider<T> implements MigrationDataProvider<T> {
+
+
+
+    //default to nothing so that we don't return null
+    private Observable<T> observable = Observable.empty();
+
+
+    public TestMigrationDataProvider(  ) {}
+
+
+    @Override
+    public Observable<T> getData() {
+       return observable;
+    }
+
+
+    /**
+     * Set this observable to return when invoked
+     *
+     * @param observable
+     */
+    public void setObservable( final Observable<T> observable ) {
+        this.observable = observable;
+    }
+}


Mime
View raw message