usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [12/50] [abbrv] incubator-usergrid git commit: Pushed migration tests down to core tiers. WIP
Date Thu, 19 Mar 2015 23:26:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
deleted file mode 100644
index f390723..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.usergrid.corepersistence.CpSetup;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.core.migration.data.*;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeDataMigrationImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.EntityWriteHelper;
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.inject.Injector;
-
-import net.jcip.annotations.NotThreadSafe;
-
-import rx.functions.Action1;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-
-@NotThreadSafe
-public class GraphShardVersionMigrationIT extends AbstractCoreIT {
-
-    private Injector injector;
-    private ApplicationDataMigration graphShardVersionMigration;
-    private ManagerCache managerCache;
-    private DataMigrationManager dataMigrationManager;
-    private MigrationInfoSerialization migrationInfoSerialization;
-
-
-    /**
-     * Rule to do the resets we need
-     */
-    @Rule
-    public MigrationTestRule migrationTestRule = new MigrationTestRule( app,  SpringResource.getInstance().getBean(Injector.class) ,EdgeDataMigrationImpl.class  );
-    private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
-    private ApplicationObservable applicationObservable;
-
-
-    @Before
-    public void setup() {
-
-        injector =  SpringResource.getInstance().getBean( Injector.class );
-        graphShardVersionMigration = injector.getInstance( EdgeDataMigrationImpl.class );
-        managerCache = injector.getInstance( ManagerCache.class );
-        dataMigrationManager = injector.getInstance( DataMigrationManager.class );
-        migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
-        allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
-        applicationObservable = injector.getInstance(ApplicationObservable.class);
-
-    }
-
-
-    @Test
-    @Ignore("Ignored awaiting fix for USERGRID-268")
-    public void testIdMapping() throws Throwable {
-
-        assertEquals( "version 2 expected", 2, graphShardVersionMigration.getVersion() );
-        assertEquals( "Previous version expected", 1, dataMigrationManager.getCurrentVersion());
-
-
-
-        final EntityManager newAppEm = app.getEntityManager();
-
-        final String type1 = "type1thing";
-        final String type2 = "type2thing";
-        final int size = 10;
-
-        final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
-        final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
-
-
-        final Set<Id> allEntities = new HashSet<>();
-        allEntities.addAll( type1Identities );
-        allEntities.addAll( type2Identities );
-
-
-        final TestProgressObserver progressObserver = new TestProgressObserver();
-
-
-        //used to validate 1.0 types, and 2.0 types
-        final Multimap<Id, String> sourceTypes = HashMultimap.create( 10000, 10 );
-        final Multimap<Id, String> targetTypes = HashMultimap.create( 10000, 10 );
-
-
-        //read everything in previous version format and put it into our types.
-
-        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
-                                     .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
-                                         @Override
-                                         public void call(
-                                                 final ApplicationEntityGroup<CollectionScope> entity ) {
-
-                                             final GraphManager gm =
-                                                     managerCache.getGraphManager( entity.applicationScope );
-
-                                             for ( final EntityIdScope<CollectionScope> idScope : entity.entityIds ) {
-                                                 /**
-                                                  * Get our edge types from the source
-                                                  */
-                                                 gm.getEdgeTypesFromSource( new SimpleSearchEdgeType(idScope.getId(), null, null ) )
-                                                   .doOnNext(new Action1<String>() {
-                                                       @Override
-                                                       public void call(final String s) {
-                                                           sourceTypes.put(idScope.getId(), s);
-                                                       }
-                                                   }).toBlocking().lastOrDefault( null );
-
-
-                                                 /**
-                                                  * Get the edge types to the target
-                                                  */
-                                                 gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( idScope.getId(), null, null ) )
-                                                   .doOnNext( new Action1<String>() {
-                                                       @Override
-                                                       public void call( final String s ) {
-                                                           targetTypes.put( idScope.getId(), s );
-                                                       }
-                                                   } ).toBlocking().lastOrDefault( null );
-
-                                                 allEntities.remove( idScope.getId() );
-                                             }
-                                         }
-                                     } ).toBlocking().lastOrDefault( null );
-
-
-        //perform the migration
-
-        graphShardVersionMigration.migrate(applicationObservable.getAllApplicationScopes(), progressObserver).toBlocking().last();
-
-        assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
-        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
-        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-
-
-        //write the status and version, then invalidate the cache so it appears
-        migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
-        migrationInfoSerialization.setVersion( graphShardVersionMigration.getVersion() );
-        dataMigrationManager.invalidate();
-
-        assertEquals( "New version saved, and we should get new implementation",
-                graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion() );
-
-
-        //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
-        allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
-                                     .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
-                                                    @Override
-                                                    public void call(
-                                                            final ApplicationEntityGroup<CollectionScope> entity ) {
-
-                                                        final GraphManager gm =
-                                                                managerCache.getGraphManager( entity.applicationScope );
-
-                                                        for ( final EntityIdScope<CollectionScope> idScope : entity.entityIds ) {
-                                                            /**
-                                                             * Get our edge types from the source
-                                                             */
-                                                            gm.getEdgeTypesFromSource(
-                                                                    new SimpleSearchEdgeType( idScope.getId(), null, null ) )
-                                                              .doOnNext( new Action1<String>() {
-                                                                  @Override
-                                                                  public void call( final String s ) {
-                                                                      sourceTypes.remove( idScope.getId(), s );
-                                                                  }
-                                                              } ).toBlocking().lastOrDefault( null );
-
-
-                                                            /**
-                                                             * Get the edge types to the target
-                                                             */
-                                                            gm.getEdgeTypesToTarget(
-                                                                    new SimpleSearchEdgeType( idScope.getId(), null, null ) )
-                                                              .doOnNext( new Action1<String>() {
-                                                                  @Override
-                                                                  public void call( final String s ) {
-                                                                      targetTypes.remove( idScope.getId(), s );
-                                                                  }
-                                                              } ).toBlocking().lastOrDefault( null );
-                                                        }
-                                                    }
-                                                }
-
-
-                                              ).toBlocking().lastOrDefault( null );
-
-
-        assertEquals( "All source types migrated", 0, sourceTypes.size() );
-
-
-        assertEquals( "All target types migrated", 0, targetTypes.size() );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java
index 3071b72..67f89ab 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java
@@ -25,8 +25,9 @@ import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
 
 import org.apache.usergrid.CoreApplication;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Injector;
@@ -45,7 +46,7 @@ public class MigrationTestRule extends ExternalResource {
 
     protected final CoreApplication core;
     protected final DataMigrationManager dataMigrationManager;
-    protected final DataMigration dataMigration;
+    protected final DataMigration2 dataMigration;
 
     protected int currentVersion;
 
@@ -55,10 +56,8 @@ public class MigrationTestRule extends ExternalResource {
      *
      * @param core the CoreApplication rule used in this test
      * @param injector The injector used in this test
-     * @param dataMigrationClass The data migration class that is under test
      */
-    public MigrationTestRule( final CoreApplication core, final Injector injector,
-                              final Class<? extends DataMigration> dataMigrationClass ) {
+    public MigrationTestRule( final CoreApplication core, final Injector injector ) {
         this.core = core;
         this.dataMigrationManager = injector.getInstance( DataMigrationManager.class );
         this.dataMigration = injector.getInstance( dataMigrationClass );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java
deleted file mode 100644
index c7b69a1..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-
-
-public class TestProgressObserver implements DataMigration.ProgressObserver {
-
-    private boolean failed = false;
-
-
-    private List<String> updates = new ArrayList<>( 100 );
-
-
-    @Override
-    public void failed( final int migrationVersion, final String reason ) {
-        failed = true;
-    }
-
-
-    @Override
-    public void failed( final int migrationVersion, final String reason, final Throwable throwable ) {
-        failed = true;
-    }
-
-
-    @Override
-    public void update( final int migrationVersion, final String message ) {
-        updates.add( message );
-    }
-
-
-    /**
-     * Get if we failed
-     * @return
-     */
-    public boolean getFailed() {
-        return failed;
-    }
-
-
-    /**
-     * Get update messages
-     * @return
-     */
-    public List<String> getUpdates() {
-        return updates;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index ca14bd0..7a6ec22 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -23,26 +23,20 @@ package org.apache.usergrid.corepersistence.rx;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.cassandra.SpringResource;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Injector;
@@ -59,12 +53,10 @@ import static org.junit.Assert.assertTrue;
  */
 public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
-    private final Logger logger = LoggerFactory.getLogger( AllEntitiesInSystemObservableIT.class );
-
     @Test
     public void testEntities() throws Exception {
         Injector injector =  SpringResource.getInstance().getBean(Injector.class);
-        AllEntitiesInSystemObservable allEntitiesInSystemObservableImpl =injector.getInstance(AllEntitiesInSystemObservable.class);
+        AllEntitiesInSystemImpl allEntitiesInSystemObservableImpl =injector.getInstance(AllEntitiesInSystemImpl.class);
         TargetIdObservable targetIdObservable = injector.getInstance(TargetIdObservable.class);
 
         final EntityManager em = app.getEntityManager();
@@ -99,34 +91,25 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
 
         final ApplicationScope scope = CpNamingUtils.getApplicationScope( app.getId() );
-        final Id applicationId = scope.getApplication();
 
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        allEntitiesInSystemObservableImpl.getAllEntitiesInSystem( 1000).doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
+        allEntitiesInSystemObservableImpl.getData().doOnNext( new Action1<EntityIdScope>() {
             @Override
-            public void call( final ApplicationEntityGroup<CollectionScope> entity ) {
-
-                assertNotNull(entity);
-                assertNotNull(entity.applicationScope);
-                assertNotNull(entity.entityIds);
-
-                //not from our test, don't check it
-                if(!applicationId.equals( entity.applicationScope.getApplication() )){
-                    return;
-                }
-
-                for(EntityIdScope<CollectionScope> idScope : entity.entityIds) {
+            public void call( final EntityIdScope entityIdScope ) {
+                assertNotNull(entityIdScope);
+                assertNotNull(entityIdScope.getCollectionScope());
+                assertNotNull(entityIdScope.getId());
 
                     //we should only emit each node once
-                    if ( idScope.getId().getType().equals( type1 ) ) {
-                        assertTrue( "Element should be present on removal", type1Identities.remove(idScope.getId() ) );
+                    if ( entityIdScope.getId().getType().equals( type1 ) ) {
+                        assertTrue( "Element should be present on removal", type1Identities.remove(entityIdScope.getId() ) );
                     }
-                    else if ( idScope.getId().getType().equals( type2 ) ) {
-                        assertTrue( "Element should be present on removal", type2Identities.remove(idScope.getId() ) );
+                    else if ( entityIdScope.getId().getType().equals( type2 ) ) {
+                        assertTrue( "Element should be present on removal", type2Identities.remove(entityIdScope.getId() ) );
                     }
-                }
+
             }
         } ).toBlocking().lastOrDefault( null );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
index 081bbed..649f518 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
@@ -24,15 +24,15 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Application;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Injector;
 
@@ -52,7 +52,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
 
         final Application createdApplication = app.getEntityManager().getApplication();
 
-        ApplicationObservable applicationObservable =SpringResource.getInstance().getBean(Injector.class).getInstance(ApplicationObservable.class);
+        AllApplicationsObservable applicationObservable =SpringResource.getInstance().getBean(Injector.class).getInstance(AllApplicationsObservable.class);
 
         //now our get all apps we expect.  There may be more, but we don't care about those.
         final Set<UUID> applicationIds = new HashSet<UUID>() {{
@@ -67,13 +67,13 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
         //clean up our wiring
         ManagerCache managerCache = SpringResource.getInstance().getBean( Injector.class ).getInstance( ManagerCache.class );
 
-        Observable<Id> appObservable = applicationObservable.getAllApplicationIds();
+        Observable<ApplicationScope> appObservable = applicationObservable.getAllApplications();
 
-        appObservable.doOnNext( new Action1<Id>() {
+        appObservable.doOnNext( new Action1<ApplicationScope>() {
             @Override
-            public void call( final Id id ) {
-                applicationIds.remove( id.getUuid() );
-                assertEquals("Correct application type expected" ,  Application.ENTITY_TYPE, id.getType() );
+            public void call( final ApplicationScope id ) {
+                applicationIds.remove( id.getApplication().getUuid() );
+                assertEquals("Correct application type expected" ,  Application.ENTITY_TYPE, id.getApplication().getType() );
             }
         } ).toBlocking().lastOrDefault( null );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java
index e518298..6d32d73 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java
@@ -65,6 +65,7 @@ public interface MvccLogEntry {
         /**
          * The logentry being written represents a partial entity
          */
+        @Deprecated//removed in v3
         PARTIAL(1),
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 7d78177..d5478d4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -89,28 +89,15 @@ public abstract class CollectionModule extends AbstractModule {
         configureMigrationProvider();
 
     }
-
-    @Provides
-    @Singleton
-    @Inject
-    @Write
-    public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
-        final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
-
-        return writeStart;
-    }
-
-
-    //TODO USERGRID-405, remove this, it's no longer supported
-    @Provides
-    @Singleton
-    @Inject
-    @WriteUpdate
-    public WriteStart writeUpdate (final MvccLogEntrySerializationStrategy logStrategy) {
-        final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.PARTIAL );
-
-        return writeStart;
-    }
+//
+//    @Provides
+//    @Singleton
+//    @Inject
+//    public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
+//        final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
+//
+//        return writeStart;
+//    }
 
     @Inject
     @Singleton

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/Write.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/Write.java
deleted file mode 100644
index ad752af..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/Write.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface Write {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/WriteUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/WriteUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/WriteUpdate.java
deleted file mode 100644
index 0ba3991..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/WriteUpdate.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface WriteUpdate {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index fa36f42..10d85f8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -32,8 +32,6 @@ import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
 import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
 import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.collection.guice.Write;
-import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
@@ -66,7 +64,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
 
 
     private final WriteStart writeStart;
-    private final WriteStart writeUpdate;
     private final WriteUniqueVerify writeVerifyUnique;
     private final WriteOptimisticVerify writeOptimisticVerify;
     private final WriteCommit writeCommit;
@@ -91,7 +88,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
 
                                   //create the target EM that will perform logic
                             final EntityCollectionManager target = new EntityCollectionManagerImpl(
-                                writeStart, writeUpdate, writeVerifyUnique,
+                                writeStart, writeVerifyUnique,
                                 writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
                                 mvccLogEntrySerializationStrategy, keyspace, serializationFig,entityVersionCleanupFactory,
@@ -107,8 +104,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
 
 
     @Inject
-    public EntityCollectionManagerFactoryImpl( @Write final WriteStart writeStart,
-                                               @WriteUpdate final WriteStart writeUpdate,
+    public EntityCollectionManagerFactoryImpl( final WriteStart writeStart,
                                                final WriteUniqueVerify writeVerifyUnique,
                                                final WriteOptimisticVerify writeOptimisticVerify,
                                                final WriteCommit writeCommit, final RollbackAction rollback,
@@ -125,7 +121,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final SerializationFig serializationFig) {
 
         this.writeStart = writeStart;
-        this.writeUpdate = writeUpdate;
         this.writeVerifyUnique = writeVerifyUnique;
         this.writeOptimisticVerify = writeOptimisticVerify;
         this.writeCommit = writeCommit;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index e483cc1..f496cac 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -28,12 +28,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
 import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
+import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.collection.guice.Write;
-import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
@@ -44,11 +45,14 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -65,12 +69,6 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.ColumnFamily;
 import com.netflix.astyanax.model.CqlResult;
 import com.netflix.astyanax.serializers.StringSerializer;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import rx.Observable;
 import rx.Subscriber;
@@ -92,7 +90,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
     //start stages
     private final WriteStart writeStart;
-    private final WriteStart writeUpdate;
     private final WriteUniqueVerify writeVerifyUnique;
     private final WriteOptimisticVerify writeOptimisticVerify;
     private final WriteCommit writeCommit;
@@ -117,8 +114,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
     @Inject
     public EntityCollectionManagerImpl(
-        @Write final WriteStart                    writeStart,
-        @WriteUpdate final WriteStart              writeUpdate,
+        final WriteStart                    writeStart,
         final WriteUniqueVerify                    writeVerifyUnique,
         final WriteOptimisticVerify                writeOptimisticVerify,
         final WriteCommit                          writeCommit,
@@ -142,7 +138,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         MvccValidationUtils.validateCollectionScope( collectionScope );
 
         this.writeStart = writeStart;
-        this.writeUpdate = writeUpdate;
         this.writeVerifyUnique = writeVerifyUnique;
         this.writeOptimisticVerify = writeOptimisticVerify;
         this.writeCommit = writeCommit;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
index 8cd21e1..92dc69d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
@@ -38,7 +38,6 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo
 
     private final MvccLogEntrySerializationStrategy logStrategy;
 
-    MvccEntity.Status status;
 
 
     /**
@@ -46,9 +45,8 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo
      */
 
     @Inject
-    public WriteStart ( final MvccLogEntrySerializationStrategy logStrategy, MvccEntity.Status status) {
+    public WriteStart ( final MvccLogEntrySerializationStrategy logStrategy) {
         this.logStrategy = logStrategy;
-        this.status = status;
 
     }
 
@@ -69,7 +67,7 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo
 
             MutationBatch write = logStrategy.write( collectionScope, startEntry );
 
-            final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, newVersion, status, entity );
+            final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, newVersion, MvccEntity.Status.COMPLETE, entity );
             if(ioEvent.getEvent().hasVersion()) {
                 try {
                     write.execute();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 108a4d8..7580a26 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -334,56 +334,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
                                                       .toBlocking()
                                                       .last();
                                           }
-                                      } ).doOnNext(
-                                      new Action1<List<GroupedObservable<Id, EntityToSaveMessage>>>() {
-                                          @Override
-                                          public void call(
-                                                  final List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
-
-                                              for ( final GroupedObservable<Id, EntityToSaveMessage> group : groupedObservables ) {
-
-                                                  //get the highest
-                                                  // entity and run a
-                                                  // cleanup task on it
-                                                  final EntityToSaveMessage
-                                                          maxEntity =
-                                                          group.toBlocking()
-                                                               .last();
-
-                                                  final EntityVersionCleanupTask
-                                                          task =
-                                                          entityVersionCleanupFactory
-                                                                  .getTask(
-                                                                          maxEntity.scope,
-                                                                          maxEntity.entity
-                                                                                  .getId(),
-                                                                          maxEntity.entity
-                                                                                  .getVersion() );
-
-                                                  /**
-                                                   * just run the
-                                                   * call in this
-                                                   * process, we're
-                                                   * already
-                                                   * doing parallel
-                                                   * this forces a
-                                                   * repair of the
-                                                   * unique properties,
-                                                   and will bring us
-                                                   to a consistent
-                                                   state after the
-                                                   */
-
-                                                  try {
-                                                      task.call();
-                                                  }
-                                                  catch ( Exception e ) {
-                                                      throw new RuntimeException(
-                                                              "Unable to run cleanup task",
-                                                              e );
-                                                  }
-                                              }
-                                          }
                                       } ).
                                       reduce( 0l,
                                               new Func2<Long, List<GroupedObservable<Id, EntityToSaveMessage>>, Long>() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java
index a683d23..c00c82a 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java
@@ -68,7 +68,7 @@ public class WriteStartTest extends AbstractEntityStageTest {
         final Entity entity = TestEntityGenerator.generateEntity();
 
         //run the stage
-        WriteStart newStage = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
+        WriteStart newStage = new WriteStart( logStrategy);
 
 
         //verify the observable is correct
@@ -113,7 +113,7 @@ public class WriteStartTest extends AbstractEntityStageTest {
         //set up the mock to return the entity from the start phase
         final Entity entity = TestEntityGenerator.generateEntity(new SimpleId(UUID.randomUUID(),"test"),null);
         //run the stage
-        WriteStart newStage = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
+        WriteStart newStage = new WriteStart( logStrategy );
 
         //verify the observable is correct
         CollectionIoEvent<MvccEntity> result = newStage.call( new CollectionIoEvent<Entity>( context, entity ) );
@@ -140,7 +140,7 @@ public class WriteStartTest extends AbstractEntityStageTest {
     protected void validateStage( final CollectionIoEvent<Entity> event ) {
         final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
 
-        new WriteStart( logStrategy, MvccEntity.Status.COMPLETE ).call( event );
+        new WriteStart( logStrategy ).call( event );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
new file mode 100644
index 0000000..dde67cb
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+
+import java.util.UUID;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.CollectionDataVersions;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV1Impl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
+import org.apache.usergrid.persistence.core.guice.DataMigrationResetRule;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.TestProgressObserver;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import rx.Observable;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+
+@NotThreadSafe
+@RunWith( ITRunner.class )
+@UseModules( { TestCollectionModule.class } )
+public class MvccEntityDataMigrationV1ToV3ImplTest implements DataMigrationResetRule.DataMigrationManagerProvider {
+
+    @Inject
+    public DataMigrationManager dataMigrationManager;
+
+    @Inject
+    private MvccEntitySerializationStrategyV1Impl v1Impl;
+
+    @Inject
+    private MvccEntitySerializationStrategyV3Impl v3Impl;
+
+    @Inject
+    public MvccEntityDataMigrationImpl mvccEntityDataMigrationImpl;
+
+    @Inject
+    public VersionedMigrationSet<MvccEntitySerializationStrategy> versions;
+
+    /**
+     * Rule to do the resets we need
+     */
+    @Rule
+    public DataMigrationResetRule migrationTestRule =
+        new DataMigrationResetRule( this, CollectionMigrationPlugin.PLUGIN_NAME,
+            CollectionDataVersions.INITIAL.getVersion() );
+
+
+    @Test
+    public void testMigration() throws ConnectionException {
+
+        final Id applicationId = createId("application");
+        final String collectionName = "things";
+
+        CollectionScope scope = new CollectionScopeImpl(applicationId, applicationId, collectionName );
+
+        final MvccEntity entity1 = getEntity( "thing" );
+        final MvccEntity entity2 = getEntity( "thing" );
+
+        v1Impl.write( scope, entity1 ).execute();
+        v1Impl.write( scope, entity2 ).execute();
+
+
+        MvccEntity returned1 = v1Impl.load( scope, entity1.getId() ).get();
+        MvccEntity returned2 = v1Impl.load( scope, entity2.getId() ).get();
+
+        assertEquals("Same entity", entity1, returned1);
+        assertEquals("Same entity", entity2, returned2);
+
+        final Observable<EntityIdScope> entityIdScope = Observable.from( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
+
+
+        final MigrationDataProvider<EntityIdScope> migrationProvider = new MigrationDataProvider<EntityIdScope>() {
+            @Override
+            public Observable<EntityIdScope> getData() {
+                return entityIdScope;
+            }
+        };
+
+        final TestProgressObserver progressObserver = new TestProgressObserver();
+
+        //now migration
+        final int newVersion = mvccEntityDataMigrationImpl.migrate( CollectionDataVersions.INITIAL.getVersion(), migrationProvider, progressObserver  );
+
+
+        assertEquals( "Correct version returned", newVersion, CollectionDataVersions.LOG_REMOVAL.getVersion() );
+        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
+        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
+
+
+        //now verify we can read the data correctly in the new version
+        returned1 = v3Impl.load( scope, entity1.getId() ).get();
+           returned2 = v3Impl.load( scope, entity2.getId() ).get();
+
+           assertEquals("Same entity", entity1, returned1);
+           assertEquals("Same entity", entity2, returned2);
+
+        //verify the tuple is correct
+
+        final MigrationRelationship<MvccEntitySerializationStrategy>
+            tuple = versions.getMigrationRelationship( newVersion );
+
+
+        assertSame("Same instance for from", v1Impl, tuple.from);
+        assertSame("Same instance for to", v3Impl, tuple.to);
+
+
+
+    }
+
+
+    private MvccEntity getEntity(final String type){
+
+        final SimpleId entityId = new SimpleId( type );
+        final UUID version = UUIDGenerator.newTimeUUID();
+        final Entity entity = new Entity( entityId );
+
+        MvccEntityImpl logEntry = new MvccEntityImpl( entityId, version, MvccEntity.Status.COMPLETE, entity );
+
+
+        return logEntry;
+
+
+    }
+
+
+
+
+
+    @Override
+    public DataMigrationManager getDataMigrationManager() {
+        return dataMigrationManager;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/DataMigrationResetRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/DataMigrationResetRule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/DataMigrationResetRule.java
new file mode 100644
index 0000000..0167ff6
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/DataMigrationResetRule.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.guice;
+
+
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * A test rule that will set up a specific version for the plugin before test invocation
+ * then set it back afterwards
+ */
+public class DataMigrationResetRule extends ExternalResource {
+    private static final Logger logger = LoggerFactory.getLogger( DataMigrationResetRule.class );
+
+
+    private DataMigrationManagerProvider dataMigrationManagerProvider;
+
+    private final String pluginName;
+
+    private final int versionToSet;
+
+    private int existingVersion = -1;
+
+
+    public DataMigrationResetRule( final DataMigrationManagerProvider dataMigrationManagerProvider, final String pluginName, final int versionToSet ) {
+        this.dataMigrationManagerProvider = dataMigrationManagerProvider;
+        this.pluginName = pluginName;
+        this.versionToSet = versionToSet;
+    }
+
+
+
+    @Override
+    protected void before() throws MigrationException {
+
+        existingVersion = dataMigrationManagerProvider.getDataMigrationManager().getCurrentVersion( pluginName );
+
+        dataMigrationManagerProvider.getDataMigrationManager().resetToVersion( pluginName, versionToSet );
+
+        logger.info( "Migration complete" );
+    }
+
+
+    @Override
+    protected void after() {
+        dataMigrationManagerProvider.getDataMigrationManager().resetToVersion( pluginName, existingVersion );
+    }
+
+
+    /**
+     * Interface for getting a data migration manager during testing. Ugly, but required because we
+     * can't inject into this member
+     */
+    public static interface DataMigrationManagerProvider{
+
+        /**
+         * Get the data migration manager
+         * @return
+         */
+        public DataMigrationManager getDataMigrationManager();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java
new file mode 100644
index 0000000..63fad42
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.migration.data;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+
+
+public class TestProgressObserver implements ProgressObserver {
+
+    private boolean failed = false;
+
+
+    private List<String> updates = new ArrayList<>( 100 );
+
+
+    @Override
+    public void failed( final int migrationVersion, final String reason ) {
+        failed = true;
+    }
+
+
+    @Override
+    public void failed( final int migrationVersion, final String reason, final Throwable throwable ) {
+        failed = true;
+    }
+
+
+    @Override
+    public void update( final int migrationVersion, final String message ) {
+        updates.add( message );
+    }
+
+
+    /**
+     * Get if we failed
+     * @return
+     */
+    public boolean getFailed() {
+        return failed;
+    }
+
+
+    /**
+     * Get update messages
+     * @return
+     */
+    public List<String> getUpdates() {
+        return updates;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/util/IdGenerator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/util/IdGenerator.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/util/IdGenerator.java
new file mode 100644
index 0000000..1b117a9
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/util/IdGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.util;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+
+/**
+ * Test helper for generating ids
+ */
+public class IdGenerator {
+    /**
+     * Create the id
+     */
+    public static Id createId( String type ) {
+        return createId( UUIDGenerator.newTimeUUID(), type );
+    }
+
+
+    /**
+     * Generate an ID with the type and id
+     *
+     * @param id The uuid in the id
+     * @param type The type of id
+     */
+    public static Id createId( UUID id, String type ) {
+        return new SimpleId( id, type );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 67459b2..d504e47 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -47,7 +47,7 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeDataMigrationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.EdgeDataMigrationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV1Impl;
 import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV2Impl;
@@ -57,6 +57,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.GraphManagerFact
 import org.apache.usergrid.persistence.graph.serialization.impl.NodeSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.TargetIdObservableImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigrationPlugin;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
@@ -130,8 +131,8 @@ public abstract class GraphModule extends AbstractModule {
 
 
         //wire up the edg migration
-        Multibinder<DataMigration2<ApplicationScope>> dataMigrationMultibinder =
-                Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<ApplicationScope>>() {} );
+        Multibinder<DataMigration2<GraphNode>> dataMigrationMultibinder =
+                Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<GraphNode>>() {} );
 
 
         dataMigrationMultibinder.addBinding().to( EdgeDataMigrationImpl.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
deleted file mode 100644
index 49a954d..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.graph.serialization.impl;
-
-import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.functions.Func2;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Encapsulates the migration of edge meta data
- */
-public class EdgeDataMigrationImpl implements DataMigration2<ApplicationScope> {
-
-    private static final Logger logger = LoggerFactory.getLogger(EdgeDataMigrationImpl.class);
-
-    private final Keyspace keyspace;
-    private final GraphManagerFactory graphManagerFactory;
-    private final EdgesObservable edgesFromSourceObservable;
-    private final VersionedMigrationSet<EdgeMetadataSerialization> allVersions;
-    private final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2;
-
-    @Inject
-    public EdgeDataMigrationImpl( final Keyspace keyspace, final GraphManagerFactory graphManagerFactory,
-                                  final EdgesObservable edgesFromSourceObservable,
-
-                                  final VersionedMigrationSet<EdgeMetadataSerialization> allVersions,
-                                  final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2 ) {
-
-        this.keyspace = keyspace;
-        this.graphManagerFactory = graphManagerFactory;
-        this.edgesFromSourceObservable = edgesFromSourceObservable;
-        this.allVersions = allVersions;
-        this.edgeMetadataSerializationV2 = edgeMetadataSerializationV2;
-    }
-
-
-
-
-    @Override
-       public int migrate( final int currentVersion, final MigrationDataProvider<ApplicationScope> migrationDataProvider,
-                           final ProgressObserver observer ) {
-
-        final AtomicLong counter = new AtomicLong();
-
-        final MigrationRelationship<EdgeMetadataSerialization>
-                migration = allVersions.getMigrationRelationship( currentVersion );
-
-        final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap(new Func1<ApplicationScope, Observable<List<Edge>>>() {
-                  @Override
-                  public Observable<List<Edge>> call(final ApplicationScope applicationScope) {
-                      final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
-                      final Observable<Edge> edgesFromSource =
-                              edgesFromSourceObservable.edgesFromSource( gm, applicationScope.getApplication() );
-                      logger.info( "Migrating edges scope {}", applicationScope );
-
-                      //get each edge from this node as a source
-                      return edgesFromSource
-
-                              //for each edge, re-index it in v2  every 1000 edges or less
-                              .buffer( 1000 ).parallel( new Func1<Observable<List<Edge>>, Observable<List<Edge>>>() {
-                                  @Override
-                                  public Observable<List<Edge>> call( final Observable<List<Edge>> listObservable ) {
-                                      return listObservable.doOnNext( new Action1<List<Edge>>() {
-                                          @Override
-                                          public void call( List<Edge> edges ) {
-                                              final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                              for ( Edge edge : edges ) {
-                                                  logger.info( "Migrating meta for edge {}", edge );
-                                                  final MutationBatch edgeBatch =
-                                                          migration.to.writeEdge( applicationScope, edge );
-                                                  batch.mergeShallow( edgeBatch );
-                                              }
-
-                                              try {
-                                                  batch.execute();
-                                              }
-                                              catch ( ConnectionException e ) {
-                                                  throw new RuntimeException( "Unable to perform migration", e );
-                                              }
-
-                                              //update the observer so the admin can see it
-                                              final long newCount = counter.addAndGet( edges.size() );
-
-                                              observer.update( migration.to.getImplementationVersion(),
-                                                      String.format( "Currently running.  Rewritten %d edge types",
-                                                              newCount ) );
-                                          }
-                                      } );
-                                  }
-                              } );
-                  }});
-
-        observable.longCount().toBlocking().last();
-
-        return migration.to.getImplementationVersion();
-
-    }
-
-
-
-
-    @Override
-    public boolean supports( final int currentVersion ) {
-        return currentVersion < edgeMetadataSerializationV2.getImplementationVersion();
-    }
-
-
-    @Override
-    public int getMaxVersion() {
-        //we only support up to v2 ATM
-        return edgeMetadataSerializationV2.getImplementationVersion();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
new file mode 100644
index 0000000..af157a6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -0,0 +1,149 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.migration;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV2Impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Encapsulates the migration of edge meta data.
+ *
+ * The migration data provider assumes that is will visit every node in the graph
+ * all edges from these source node will then be re-indexed.
+ */
+public class EdgeDataMigrationImpl implements DataMigration2<GraphNode> {
+
+    private static final Logger logger = LoggerFactory.getLogger(EdgeDataMigrationImpl.class);
+
+    private final Keyspace keyspace;
+    private final GraphManagerFactory graphManagerFactory;
+    private final EdgesObservable edgesFromSourceObservable;
+    private final VersionedMigrationSet<EdgeMetadataSerialization> allVersions;
+    private final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2;
+
+    @Inject
+    public EdgeDataMigrationImpl( final Keyspace keyspace,
+                                  final GraphManagerFactory graphManagerFactory,
+                                  final EdgesObservable edgesFromSourceObservable,
+                                  final VersionedMigrationSet<EdgeMetadataSerialization> allVersions,
+                                  final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2 ) {
+
+        this.keyspace = keyspace;
+        this.graphManagerFactory = graphManagerFactory;
+        this.edgesFromSourceObservable = edgesFromSourceObservable;
+        this.allVersions = allVersions;
+        this.edgeMetadataSerializationV2 = edgeMetadataSerializationV2;
+    }
+
+
+
+
+    @Override
+       public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode> migrationDataProvider,
+                           final ProgressObserver observer ) {
+
+        final AtomicLong counter = new AtomicLong();
+
+        final MigrationRelationship<EdgeMetadataSerialization>
+                migration = allVersions.getMigrationRelationship( currentVersion );
+
+       final Observable<List<Edge>> observable =  migrationDataProvider.getData().flatMap( new Func1<GraphNode,
+           Observable<List<Edge>>>() {
+            @Override
+            public Observable<List<Edge>> call( final GraphNode graphNode ) {
+                final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
+
+                //get edges from the source
+                return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer( 1000 ).parallel( new Func1<Observable<List<Edge>>, Observable<List<Edge>>>() {
+                                                  @Override
+                                                  public Observable<List<Edge>> call( final Observable<List<Edge>> listObservable ) {
+                          return listObservable.doOnNext( new Action1<List<Edge>>() {
+                              @Override
+                              public void call( List<Edge> edges ) {
+                                  final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                  for ( Edge edge : edges ) {
+                                      logger.info( "Migrating meta for edge {}", edge );
+                                      final MutationBatch edgeBatch =
+                                              migration.to.writeEdge(  graphNode.applicationScope, edge );
+                                      batch.mergeShallow( edgeBatch );
+                                  }
+
+                                  try {
+                                      batch.execute();
+                                  }
+                                  catch ( ConnectionException e ) {
+                                      throw new RuntimeException( "Unable to perform migration", e );
+                                  }
+
+                                  //update the observer so the admin can see it
+                                  final long newCount = counter.addAndGet( edges.size() );
+
+                                  observer.update( migration.to.getImplementationVersion(),
+                                          String.format( "Currently running.  Rewritten %d edge types",
+                                                  newCount ) );
+                              }
+                          } );
+                  } } );
+            }} );
+
+        observable.longCount().toBlocking().last();
+
+        return migration.to.getImplementationVersion();
+
+    }
+
+
+
+
+    @Override
+    public boolean supports( final int currentVersion ) {
+        return currentVersion < edgeMetadataSerializationV2.getImplementationVersion();
+    }
+
+
+    @Override
+    public int getMaxVersion() {
+        //we only support up to v2 ATM
+        return edgeMetadataSerializationV2.getImplementationVersion();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
index 0d3405b..c989822 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
@@ -30,7 +30,6 @@ import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializ
 import org.apache.usergrid.persistence.core.migration.data.newimpls.AbstractMigrationPlugin;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
 import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -40,15 +39,15 @@ import com.google.inject.Singleton;
  * Migration plugin for the collection module
  */
 @Singleton
-public class GraphMigrationPlugin extends AbstractMigrationPlugin<ApplicationScope> {
+public class GraphMigrationPlugin extends AbstractMigrationPlugin<GraphNode> {
 
     public static final String PLUGIN_NAME = "graph-data";
 
 
 
     @Inject
-    public GraphMigrationPlugin( final Set<DataMigration2<ApplicationScope>> entityDataMigrations,
-                                      final MigrationDataProvider<ApplicationScope> entityIdScopeDataMigrationProvider,
+    public GraphMigrationPlugin( final Set<DataMigration2<GraphNode>> entityDataMigrations,
+                                      final MigrationDataProvider<GraphNode> entityIdScopeDataMigrationProvider,
                                       final MigrationInfoSerialization migrationInfoSerialization ) {
         super( entityDataMigrations, entityIdScopeDataMigrationProvider, migrationInfoSerialization );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphNode.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphNode.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphNode.java
new file mode 100644
index 0000000..00aa617
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.migration;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Traverses a graph give the scope and then will traverse from the root node to the last scope.
+ */
+public class GraphNode {
+    public final ApplicationScope applicationScope;
+    public final Id entryNode;
+
+
+    public GraphNode( final ApplicationScope applicationScope, final Id entryNode ) {
+        this.applicationScope = applicationScope;
+        this.entryNode = entryNode;
+    }
+}


Mime
View raw message