usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [29/38] usergrid git commit: Unique value serialization completely converted to use CQL. Complete Astyanax removal still needs to be completed.
Date Wed, 17 Aug 2016 21:48:36 GMT
Unique value serialization completely converted to use CQL.  Complete Astyanax removal still needs to be completed.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/866d11bf
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/866d11bf
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/866d11bf

Branch: refs/heads/master
Commit: 866d11bf3ecf909682f9fbc91195323f26a1e2ad
Parents: 0c60987
Author: Michael Russo <mrusso@apigee.com>
Authored: Sun May 8 15:45:28 2016 +0800
Committer: Michael Russo <mrusso@apigee.com>
Committed: Sun May 8 15:45:28 2016 +0800

----------------------------------------------------------------------
 .../EntityCollectionManagerFactoryImpl.java     |   9 +-
 .../impl/EntityCollectionManagerImpl.java       |  21 +-
 .../mvcc/stage/delete/UniqueCleanup.java        |  21 +-
 .../mvcc/stage/write/RollbackAction.java        |  44 +--
 .../mvcc/stage/write/WriteUniqueVerify.java     |  15 +-
 .../UniqueValueSerializationStrategy.java       |  19 +-
 .../impl/AllUniqueFieldsIterator.java           |  29 ++
 .../serialization/impl/EntityVersion.java       |  12 +-
 .../UniqueValueSerializationStrategyImpl.java   | 384 ++++++-------------
 ...iqueValueSerializationStrategyProxyImpl.java |  12 +-
 .../UniqueValueSerializationStrategyV1Impl.java | 125 +++---
 .../UniqueValueSerializationStrategyV2Impl.java | 109 ++++--
 .../serialization/impl/UniqueValueSetImpl.java  |   6 +-
 .../stage/write/WriteOptimisticVerifyTest.java  |  17 +-
 ...niqueValueSerializationStrategyImplTest.java |   5 +-
 15 files changed, 383 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index a52ee9c..71e56f5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.collection.impl;
 
 import java.util.concurrent.ExecutionException;
 
+import com.datastax.driver.core.Session;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
@@ -75,6 +76,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
     private final Keyspace keyspace;
+    private final Session session;
     private final MetricsFactory metricsFactory;
     private final RxTaskScheduler rxTaskScheduler;
 
@@ -89,7 +91,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
                                 mvccLogEntrySerializationStrategy, keyspace,
                                 metricsFactory, serializationFig,
-                                rxTaskScheduler, scope );
+                                rxTaskScheduler, scope, session );
 
                             return target;
                         }
@@ -107,7 +109,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                final Keyspace keyspace, final EntityCacheFig entityCacheFig,
-                                               final MetricsFactory metricsFactory, @CollectionExecutorScheduler  final RxTaskScheduler rxTaskScheduler ) {
+                                               final MetricsFactory metricsFactory,
+                                               @CollectionExecutorScheduler  final RxTaskScheduler rxTaskScheduler,
+                                               final Session session ) {
 
         this.writeStart = writeStart;
         this.writeVerifyUnique = writeVerifyUnique;
@@ -125,6 +129,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.keyspace = keyspace;
         this.metricsFactory = metricsFactory;
         this.rxTaskScheduler = rxTaskScheduler;
+        this.session = session;
     }
     @Override
     public EntityCollectionManager createCollectionManager(ApplicationScope applicationScope) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index e71e6bb..6d42fa2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -26,6 +26,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
 import com.netflix.astyanax.model.ConsistencyLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +66,6 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
-import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.codahale.metrics.Timer;
@@ -72,7 +73,6 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.OperationResult;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.ColumnFamily;
@@ -81,7 +81,6 @@ import com.netflix.astyanax.serializers.StringSerializer;
 
 import rx.Observable;
 import rx.Subscriber;
-import rx.functions.Action0;
 
 
 /**
@@ -114,6 +113,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
     private final Keyspace keyspace;
+    private final Session session;
     private final Timer writeTimer;
     private final Timer deleteTimer;
     private final Timer fieldIdTimer;
@@ -136,7 +136,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                                         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                         final Keyspace keyspace, final MetricsFactory metricsFactory,
                                         final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler,
-                                        @Assisted final ApplicationScope applicationScope ) {
+                                        @Assisted final ApplicationScope applicationScope,
+                                        final Session session) {
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
         this.uniqueCleanup = uniqueCleanup;
@@ -157,6 +158,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         this.markCommit = markCommit;
 
         this.keyspace = keyspace;
+        this.session = session;
 
 
         this.applicationScope = applicationScope;
@@ -347,8 +349,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                 //Load a entity for each entityId we retrieved.
                 final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime );
 
-                //now loop through and ensure the entities are there.
-                final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
+                final BatchStatement uniqueDeleteBatch = new BatchStatement();
 
                 final MutableFieldSet response = new MutableFieldSet( fields1.size() );
 
@@ -357,9 +358,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
                     //bad unique value, delete this, it's inconsistent
                     if ( entity == null || !entity.getEntity().isPresent() ) {
-                        final MutationBatch valueDelete =
-                            uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique );
-                        deleteBatch.mergeShallow( valueDelete );
+                        uniqueDeleteBatch.add(
+                            uniqueValueSerializationStrategy.deleteCQL( applicationScope, expectedUnique ));
                         continue;
                     }
 
@@ -371,8 +371,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                 }
 
                 //TODO: explore making this an Async process
-                //We'll repair it again if we have to
-                deleteBatch.execute();
+                session.execute(uniqueDeleteBatch);
 
                 return response;
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
index 8aa5cfc..9f2b994 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -26,6 +26,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +65,7 @@ public class UniqueCleanup
 
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final Keyspace keyspace;
+    private final Session session;
 
     private final SerializationFig serializationFig;
 
@@ -70,12 +73,14 @@ public class UniqueCleanup
     @Inject
     public UniqueCleanup( final SerializationFig serializationFig,
                           final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
-                          final Keyspace keyspace, final MetricsFactory metricsFactory ) {
+                          final Keyspace keyspace, final MetricsFactory metricsFactory,
+                          final Session session ) {
 
         this.serializationFig = serializationFig;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.keyspace = keyspace;
         this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup.base" );
+        this.session = session;
     }
 
 
@@ -127,22 +132,20 @@ public class UniqueCleanup
                             //roll them up
 
                         .doOnNext( uniqueValues -> {
-                            final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+                            final BatchStatement uniqueCleanupBatch = new BatchStatement();
 
 
                             for ( UniqueValue value : uniqueValues ) {
                                 logger
                                     .debug( "Deleting value:{} from application scope: {} ", value, applicationScope );
                                 uniqueCleanupBatch
-                                    .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
+                                    .add( uniqueValueSerializationStrategy.deleteCQL( applicationScope, value ) );
                             }
 
-                            try {
-                                uniqueCleanupBatch.execute();
-                            }
-                            catch ( ConnectionException e ) {
-                                throw new RuntimeException( "Unable to execute batch mutation", e );
-                            }
+
+                            session.execute(uniqueCleanupBatch);
+
                         } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
 
                 return ObservableTimer.time( uniqueValueCleanup, uniqueCleanupTimer );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
index 23c6dfe..e5c4c96 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,14 +53,17 @@ public class RollbackAction implements Action1<Throwable> {
 
     private final UniqueValueSerializationStrategy uniqueValueStrat;
     private final MvccLogEntrySerializationStrategy logEntryStrat;
+    private final Session session;
 
 
     @Inject
-    public RollbackAction(MvccLogEntrySerializationStrategy logEntryStrat,
-                           UniqueValueSerializationStrategy uniqueValueStrat ) {
+    public RollbackAction( final MvccLogEntrySerializationStrategy logEntryStrat,
+                           final UniqueValueSerializationStrategy uniqueValueStrat,
+                           final Session session ) {
 
         this.uniqueValueStrat = uniqueValueStrat;
         this.logEntryStrat = logEntryStrat;
+        this.session = session;
     }
 
 
@@ -72,6 +77,7 @@ public class RollbackAction implements Action1<Throwable> {
 
             // one batch to handle rollback
             MutationBatch rollbackMb = null;
+            final BatchStatement uniqueDeleteBatch = new BatchStatement();
             final Optional<Entity> entity = mvccEntity.getEntity();
 
             if ( entity.isPresent() ) {
@@ -83,45 +89,17 @@ public class RollbackAction implements Action1<Throwable> {
                         UniqueValue toDelete =
                                 new UniqueValueImpl( field, entity.get().getId(), mvccEntity.getVersion() );
 
-                        MutationBatch deleteMb = uniqueValueStrat.delete(scope,  toDelete );
+                        uniqueDeleteBatch.add(uniqueValueStrat.deleteCQL(scope,  toDelete ));
 
-                        if ( rollbackMb == null ) {
-                            rollbackMb = deleteMb;
-                        }
-                        else {
-                            rollbackMb.mergeShallow( deleteMb );
-                        }
                     }
                 }
 
-
-                if ( rollbackMb != null ) {
-                    try {
-                        rollbackMb.execute();
-                    }
-                    catch ( ConnectionException ex ) {
-                        throw new RuntimeException( "Error rolling back changes", ex );
-                    }
-                }
+                // execute the batch statements for deleting unique field entries
+                session.execute(uniqueDeleteBatch);
 
                 logEntryStrat.delete( scope, entity.get().getId(), mvccEntity.getVersion() );
             }
         }
     }
 
-
-    class FieldDeleteResult {
-
-        private final String name;
-
-
-        public FieldDeleteResult( String name ) {
-            this.name = name;
-        }
-
-
-        public String getName() {
-            return this.name;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 8e0b202..501950a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.Session;
+import com.netflix.hystrix.HystrixCommandProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,7 +69,9 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
     private final UniqueValueSerializationStrategy uniqueValueStrat;
 
-    public static int uniqueVerifyPoolSize = 100;
+    private static int uniqueVerifyPoolSize = 100;
+
+    private static int uniqueVerifyTimeoutMillis= 5000;
 
     protected final SerializationFig serializationFig;
 
@@ -224,8 +227,10 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
     /**
      * Command group used for realtime user commands
      */
-    public static final HystrixCommand.Setter
-        REPLAY_GROUP = HystrixCommand.Setter.withGroupKey(
-            HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) ).andThreadPoolPropertiesDefaults(
-                HystrixThreadPoolProperties.Setter().withCoreSize( uniqueVerifyPoolSize ) );
+    private static final HystrixCommand.Setter
+        REPLAY_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) )
+        .andThreadPoolPropertiesDefaults(
+            HystrixThreadPoolProperties.Setter().withCoreSize( uniqueVerifyPoolSize ) )
+        .andCommandPropertiesDefaults(
+            HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(uniqueVerifyTimeoutMillis));
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 56e8b87..bb6f5fe 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -41,23 +41,14 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa
 
 
     /**
-     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
-     *
-     * @param applicationScope scope
-     * @param uniqueValue Object to be written
-     *
-     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
-     */
-
-    /**
-     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
+     * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds. -1 is the same as no ttl
+     * (lives forever)
      *
      * @param applicationScope scope
      * @param uniqueValue Object to be written
      * @param timeToLive How long object should live in seconds.  -1 implies store forever
-     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+     * @return BatchStatement that encapsulates CQL statements, caller may or may not execute.
      */
-
     BatchStatement writeCQL(ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive );
 
     /**
@@ -103,9 +94,9 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa
      *
      * @param applicationScope The scope of the application
      * @param uniqueValue Object to be deleted.
-     * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+     * @return BatchStatement that encapsulates the CQL statements, caller may or may not execute.
      */
-    MutationBatch delete( ApplicationScope applicationScope, UniqueValue uniqueValue );
+    BatchStatement deleteCQL( ApplicationScope applicationScope, UniqueValue uniqueValue);
 
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java
new file mode 100644
index 0000000..ed210e9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+
+import java.util.Iterator;
+
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
index 274cf5d..d451adc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
@@ -42,7 +42,8 @@ public class EntityVersion {
         return entityVersion;
     }
 
-    public boolean equals( Object o ) {
+    @Override
+    public boolean equals( final Object o ) {
 
         if ( o == null || !(o instanceof EntityVersion) ) {
             return false;
@@ -60,5 +61,12 @@ public class EntityVersion {
 
         return true;
     }
-    
+
+    @Override
+    public int hashCode() {
+        int result = entityId.hashCode();
+        result = 31 * result + entityVersion.hashCode();
+        return result;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 27a8609..e0a9035 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -22,11 +22,10 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.datastax.driver.core.*;
+import com.datastax.driver.core.Row;
 import com.datastax.driver.core.querybuilder.Clause;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Using;
-import com.netflix.astyanax.model.*;
-import com.netflix.astyanax.util.RangeBuilder;
 import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -41,8 +40,6 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import org.apache.usergrid.persistence.core.CassandraFig;
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
@@ -56,7 +53,6 @@ import com.netflix.astyanax.ColumnListMutation;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.query.RowQuery;
 
 
 /**
@@ -126,73 +122,6 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
 
     }
 
-
-    public MutationBatch write( final ApplicationScope collectionScope, UniqueValue value ) {
-
-
-        Preconditions.checkNotNull( value, "value is required" );
-
-
-        final Id entityId = value.getEntityId();
-        final UUID entityVersion = value.getEntityVersion();
-        final Field<?> field = value.getField();
-
-        ValidationUtils.verifyIdentity( entityId );
-        ValidationUtils.verifyVersion( entityVersion );
-
-
-        final EntityVersion ev = new EntityVersion( entityId, entityVersion );
-        final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
-
-        return doWrite( collectionScope, value, new RowOp() {
-
-            @Override
-            public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
-                colMutation.putColumn( ev, COL_VALUE );
-            }
-
-
-            @Override
-            public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
-                colMutation.putColumn( uniqueFieldEntry, COL_VALUE );
-            }
-        } );
-    }
-
-
-    public MutationBatch write( final ApplicationScope collectionScope, final UniqueValue value,
-                                final int timeToLive ) {
-
-        Preconditions.checkNotNull( value, "value is required" );
-        Preconditions.checkArgument( timeToLive > 0, "timeToLive must be greater than 0 is required" );
-
-        final Id entityId = value.getEntityId();
-        final UUID entityVersion = value.getEntityVersion();
-        final Field<?> field = value.getField();
-
-        ValidationUtils.verifyIdentity( entityId );
-        ValidationUtils.verifyVersion( entityVersion );
-
-        final EntityVersion ev = new EntityVersion( entityId, entityVersion );
-        final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
-
-        return doWrite( collectionScope, value, new RowOp() {
-
-            @Override
-            public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
-                colMutation.putColumn( ev, COL_VALUE, timeToLive );
-            }
-
-
-            //we purposefully leave out TTL.  Worst case we issue deletes against tombstoned columns
-            //best case, we clean up an invalid secondary index entry when the log is used
-            @Override
-            public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
-                colMutation.putColumn( uniqueFieldEntry, COL_VALUE );
-            }
-        } );
-    }
-
     @Override
     public BatchStatement writeCQL( final ApplicationScope collectionScope, final UniqueValue value,
                            final int timeToLive  ){
@@ -259,26 +188,15 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
 
         return batch;
 
-        /**
-         *  @Override
-        public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
-        colMutation.putColumn( ev, COL_VALUE );
-        }
-
-
-         @Override
-         public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
-         colMutation.putColumn( uniqueFieldEntry, COL_VALUE );
-         }
-         */
     }
 
 
     @Override
-    public MutationBatch delete( final ApplicationScope scope, UniqueValue value ) {
+    public BatchStatement deleteCQL( final ApplicationScope scope, UniqueValue value){
 
         Preconditions.checkNotNull( value, "value is required" );
 
+        final BatchStatement batch = new BatchStatement();
 
         final Id entityId = value.getEntityId();
         final UUID entityVersion = value.getEntityVersion();
@@ -291,52 +209,31 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
         final EntityVersion ev = new EntityVersion( entityId, entityVersion );
         final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
 
-        return doWrite( scope, value, new RowOp() {
-
-            @Override
-            public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
-                colMutation.deleteColumn( ev );
-            }
-
-
-            @Override
-            public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
-                colMutation.deleteColumn( uniqueFieldEntry );
-            }
-        } );
-    }
 
+        ByteBuffer partitionKey = getPartitionKey( scope.getApplication(), value.getEntityId().getType(),
+            value.getField().getTypeName().toString(), value.getField().getName(), value.getField().getValue());
 
-    /**
-     * Do the column update or delete for the given column and row key
-     *
-     * @param applicationScope We need to use this when getting the keyspace
-     * @param uniqueValue The unique value to write
-     * @param op The operation to write
-     */
-    private MutationBatch doWrite( ApplicationScope applicationScope, UniqueValue uniqueValue, RowOp op ) {
-        final MutationBatch batch = keyspace.prepareMutationBatch();
+        ByteBuffer columnValue = serializeUniqueValueColumn(ev);
 
-        final Id applicationId = applicationScope.getApplication();
+        final Clause uniqueEqKey = QueryBuilder.eq("key", partitionKey );
+        final Clause uniqueEqColumn = QueryBuilder.eq("column1", columnValue );
+        Statement uniqueDelete = QueryBuilder.delete().from(TABLE_UNIQUE_VALUES).where(uniqueEqKey).and(uniqueEqColumn);
+        batch.add(uniqueDelete);
 
-        final FieldKey fieldKey = createUniqueValueKey( applicationId, uniqueValue.getEntityId().getType(), uniqueValue.getField() );
 
 
-        op.doLookup( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( applicationId, fieldKey ) ) );
+        ByteBuffer logPartitionKey = getLogPartitionKey(scope.getApplication(), entityId);
+        ByteBuffer logColumnValue = serializeUniqueValueLogColumn(uniqueFieldEntry);
 
 
-        final EntityKey entityKey = createEntityUniqueLogKey( applicationId, uniqueValue.getEntityId() );
+        final Clause uniqueLogEqKey = QueryBuilder.eq("key", logPartitionKey );
+        final Clause uniqueLogEqColumn = QueryBuilder.eq("column1", logColumnValue );
 
-        op.doLog( batch.withRow( CF_ENTITY_UNIQUE_VALUE_LOG,
-            ScopedRowKey.fromKey( applicationId, entityKey ) ) );
+        Statement uniqueLogDelete = QueryBuilder.delete()
+            .from(TABLE_UNIQUE_VALUES_LOG).where(uniqueLogEqKey).and( uniqueLogEqColumn);
 
+        batch.add(uniqueLogDelete);
 
-        if ( log.isTraceEnabled() ) {
-            log.trace( "Writing unique value version={} name={} value={} ",
-                    uniqueValue.getEntityVersion(), uniqueValue.getField().getName(),
-                    uniqueValue.getField().getValue()
-                );
-        }
 
 
         return batch;
@@ -364,59 +261,6 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     }
 
 
-    private UniqueValueSet loadLegacy(final ApplicationScope appScope,
-                                      final String type, final Collection<Field> fields) throws ConnectionException {
-        final List<ScopedRowKey<FieldKey>> keys = new ArrayList<>( fields.size() );
-
-        final Id applicationId = appScope.getApplication();
-
-        for ( Field field : fields ) {
-
-            final FieldKey key = createUniqueValueKey( applicationId, type,  field );
-
-
-            final ScopedRowKey<FieldKey> rowKey =
-                ScopedRowKey.fromKey( applicationId, key );
-
-            keys.add( rowKey );
-        }
-
-        final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
-
-        Iterator<com.netflix.astyanax.model.Row<ScopedRowKey<FieldKey>, EntityVersion>> results =
-            keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.CL_LOCAL_QUORUM ).getKeySlice( keys )
-                .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator();
-
-
-        while ( results.hasNext() )
-
-        {
-
-            final com.netflix.astyanax.model.Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next();
-
-
-            final Field field = parseRowKey( unique.getKey() );
-
-            final Iterator<Column<EntityVersion>> columnList = unique.getColumns().iterator();
-
-            //sanity check, nothing to do, skip it
-            if ( !columnList.hasNext() ) {
-                continue;
-            }
-
-            final EntityVersion entityVersion = columnList.next().getName();
-
-
-            final UniqueValueImpl uniqueValue =
-                new UniqueValueImpl( field, entityVersion.getEntityId(), entityVersion.getEntityVersion() );
-
-            uniqueValueSet.addValue( uniqueValue );
-        }
-
-        return uniqueValueSet;
-
-    }
-
     private UniqueValueSet loadCQL( final ApplicationScope appScope, final com.datastax.driver.core.ConsistencyLevel consistencyLevel,
                                 final String type, final Collection<Field> fields ) throws ConnectionException {
 
@@ -460,7 +304,6 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
             List<Object> keyContents = deserializePartitionKey(partitionKey);
             List<Object> columnContents = deserializeUniqueValueColumn(column);
 
-            Field field = null;
             FieldTypeName fieldType;
             String name;
             String value;
@@ -478,29 +321,8 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
 
             }
 
-            switch ( fieldType ) {
-                case BOOLEAN:
-                    field = new BooleanField( name, Boolean.parseBoolean( value ) );
-                    break;
-                case DOUBLE:
-                    field = new DoubleField( name, Double.parseDouble( value ) );
-                    break;
-                case FLOAT:
-                    field = new FloatField( name, Float.parseFloat( value ) );
-                    break;
-                case INTEGER:
-                    field =  new IntegerField( name, Integer.parseInt( value ) );
-                    break;
-                case LONG:
-                    field = new LongField( name, Long.parseLong( value ) );
-                    break;
-                case STRING:
-                    field = new StringField( name, value );
-                    break;
-                case UUID:
-                    field = new UUIDField( name, UUID.fromString( value ) );
-                    break;
-            }
+            Field field = getField(name, value, fieldType);
+
 
             final EntityVersion entityVersion = new EntityVersion(
                 new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
@@ -526,59 +348,17 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
         Preconditions.checkNotNull( entityId, "entity id is required" );
 
 
-        final Id applicationId = collectionScope.getApplication();
-
-        final EntityKey entityKey = createEntityUniqueLogKey( applicationId, entityId );
-
-
-        final ScopedRowKey<EntityKey> rowKey =
-            ScopedRowKey.fromKey( applicationId, entityKey );
-
-
-        RowQuery<ScopedRowKey<EntityKey>, UniqueFieldEntry> query =
-            keyspace.prepareQuery( CF_ENTITY_UNIQUE_VALUE_LOG ).getKey( rowKey )
-                    .withColumnRange( ( UniqueFieldEntry ) null, null, false, serializationFig.getBufferSize() );
-
-        return new ColumnNameIterator( query, new UniqueEntryParser( entityId ), false );
-    }
+        Clause inKey = QueryBuilder.in("key", getLogPartitionKey(collectionScope.getApplication(), entityId));
 
+        Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES_LOG)
+            .where(inKey);
 
-    /**
-     * Simple callback to perform puts and deletes with a common row setup code
-     */
-    private interface RowOp {
+        return new AllUniqueFieldsIterator(session, statement, entityId);
 
-        /**
-         * Execute the mutation into the lookup CF_UNIQUE_VALUES row
-         */
-        void doLookup( ColumnListMutation<EntityVersion> colMutation );
 
-        /**
-         * Execute the mutation into the lCF_ENTITY_UNIQUE_VALUESLUE row
-         */
-        void doLog( ColumnListMutation<UniqueFieldEntry> colMutation );
     }
 
 
-    /**
-     * Converts raw columns to the expected output
-     */
-    private static final class UniqueEntryParser implements ColumnParser<UniqueFieldEntry, UniqueValue> {
-
-        private final Id entityId;
-
-
-        private UniqueEntryParser( final Id entityId ) {this.entityId = entityId;}
-
-
-        @Override
-        public UniqueValue parseColumn( final Column<UniqueFieldEntry> column ) {
-            final UniqueFieldEntry entry = column.getName();
-
-            return new UniqueValueImpl( entry.getField(), entityId, entry.getVersion() );
-        }
-    }
-
 
     @Override
     public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
@@ -621,27 +401,9 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     protected abstract TableDefinition getUniqueValuesTable();
 
 
-    /**
-     * Generate a key that is compatible with the column family
-     *
-     * @param applicationId The applicationId
-     * @param type The type in the field
-     * @param field The field we're creating the key for
-     */
-    protected abstract FieldKey createUniqueValueKey(final Id applicationId, final String type, final Field field );
-
-    /**
-     * Parse the row key into the field
-     * @param rowKey
-     * @return
-     */
-    protected abstract Field parseRowKey(final ScopedRowKey<FieldKey> rowKey);
-
-
     protected abstract List<Object> deserializePartitionKey(ByteBuffer bb);
 
-
-    protected abstract Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry);
+    protected abstract ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry);
 
     protected abstract ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue );
 
@@ -651,6 +413,8 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
 
     protected abstract List<Object> deserializeUniqueValueColumn(ByteBuffer bb);
 
+    protected abstract List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb);
+
 
 
 
@@ -672,4 +436,100 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
      * @param uniqueValueId The uniqueValue
      */
     protected abstract EntityKey createEntityUniqueLogKey(final Id applicationId,  final Id uniqueValueId );
+
+
+    public class AllUniqueFieldsIterator implements Iterable<UniqueValue>, Iterator<UniqueValue> {
+
+        private final Session session;
+        private final Statement query;
+        private final Id entityId;
+
+        private Iterator<Row> sourceIterator;
+
+
+
+        public AllUniqueFieldsIterator( final Session session, final Statement query, final Id entityId){
+
+            this.session = session;
+            this.query = query;
+            this.entityId = entityId;
+
+        }
+
+
+        @Override
+        public Iterator<UniqueValue> iterator() {
+            return this;
+        }
+
+        @Override
+        public boolean hasNext() {
+
+            if ( sourceIterator == null ) {
+
+                advanceIterator();
+
+                return sourceIterator.hasNext();
+            }
+
+            return sourceIterator.hasNext();
+        }
+
+        @Override
+        public UniqueValue next() {
+
+            com.datastax.driver.core.Row next = sourceIterator.next();
+
+            ByteBuffer column = next.getBytesUnsafe("column1");
+
+            List<Object> columnContents = deserializeUniqueValueLogColumn(column);
+
+            UUID version = (UUID) columnContents.get(0);
+            String name = (String) columnContents.get(1);
+            String value = (String) columnContents.get(2);
+            FieldTypeName fieldType = FieldTypeName.valueOf((String) columnContents.get(3));
+
+
+            return new UniqueValueImpl(getField(name, value, fieldType), entityId, version);
+
+        }
+
+        private void advanceIterator() {
+
+            sourceIterator = session.execute(query).iterator();
+        }
+    }
+
+    private Field getField( String name, String value, FieldTypeName fieldType){
+
+        Field field = null;
+
+        switch ( fieldType ) {
+            case BOOLEAN:
+                field = new BooleanField( name, Boolean.parseBoolean( value ) );
+                break;
+            case DOUBLE:
+                field = new DoubleField( name, Double.parseDouble( value ) );
+                break;
+            case FLOAT:
+                field = new FloatField( name, Float.parseFloat( value ) );
+                break;
+            case INTEGER:
+                field =  new IntegerField( name, Integer.parseInt( value ) );
+                break;
+            case LONG:
+                field = new LongField( name, Long.parseLong( value ) );
+                break;
+            case STRING:
+                field = new StringField( name, value );
+                break;
+            case UUID:
+                field = new UUIDField( name, UUID.fromString( value ) );
+                break;
+        }
+
+        return field;
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
index bbfaa2d..dc5b48f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
@@ -125,19 +125,19 @@ public class UniqueValueSerializationStrategyProxyImpl implements UniqueValueSer
 
 
     @Override
-    public MutationBatch delete( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) {
+    public BatchStatement deleteCQL( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) {
         final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
 
         if ( migration.needsMigration() ) {
-            final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+            final BatchStatement batch = new BatchStatement();
 
-            aggregateBatch.mergeShallow( migration.from.delete( applicationScope, uniqueValue ) );
-            aggregateBatch.mergeShallow( migration.to.delete( applicationScope, uniqueValue ) );
+            batch.add(migration.from.deleteCQL( applicationScope, uniqueValue ) );
+            batch.add(migration.to.deleteCQL( applicationScope, uniqueValue ) );
 
-            return aggregateBatch;
+            return batch;
         }
 
-        return migration.to.delete( applicationScope, uniqueValue );
+        return migration.to.deleteCQL( applicationScope, uniqueValue );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
index 75666fa..cbd8a3e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
@@ -176,26 +176,6 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
 
 
     @Override
-    protected CollectionPrefixedKey<Field> createUniqueValueKey( final Id applicationId,
-                                                                 final String type, final Field field) {
-
-
-        final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( type );
-
-
-        final CollectionPrefixedKey<Field> uniquePrefixedKey =
-            new CollectionPrefixedKey<>( collectionName, applicationId, field );
-
-        return uniquePrefixedKey;
-    }
-
-
-    @Override
-    protected Field parseRowKey( final ScopedRowKey<CollectionPrefixedKey<Field>> rowKey ) {
-        return rowKey.getKey().getSubKey();
-    }
-
-    @Override
     protected List<Object> deserializePartitionKey(ByteBuffer bb){
 
 
@@ -230,23 +210,23 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
     }
 
     @Override
-    protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+    protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
 
         /**
-         *         final UUID version = value.getVersion();
-         final Field<?> field = value.getField();
+         *  final UUID version = value.getVersion();
+            final Field<?> field = value.getField();
 
-         final FieldTypeName fieldType = field.getTypeName();
-         final String fieldValue = field.getValue().toString().toLowerCase();
+             final FieldTypeName fieldType = field.getTypeName();
+             final String fieldValue = field.getValue().toString().toLowerCase();
 
 
-         DynamicComposite composite = new DynamicComposite(  );
+             DynamicComposite composite = new DynamicComposite(  );
 
-         //we want to sort ascending to descending by version
-         composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
-         composite.addComponent( field.getName(), STRING_SERIALIZER );
-         composite.addComponent( fieldValue, STRING_SERIALIZER );
-         composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+             //we want to sort ascending to descending by version
+             composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+             composite.addComponent( field.getName(), STRING_SERIALIZER );
+             composite.addComponent( fieldValue, STRING_SERIALIZER );
+             composite.addComponent( fieldType.name() , STRING_SERIALIZER);
          */
 
         // values are serialized as strings, not sure why, and always lower cased
@@ -337,15 +317,15 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
     protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
 
         /**
-         *         final Id entityId = ev.getEntityId();
-         final UUID entityUuid = entityId.getUuid();
-         final String entityType = entityId.getType();
+         *  final Id entityId = ev.getEntityId();
+            final UUID entityUuid = entityId.getUuid();
+            final String entityType = entityId.getType();
 
-         CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+            CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
 
-         builder.addUUID( entityVersion );
-         builder.addUUID( entityUuid );
-         builder.addString(entityType );
+            builder.addUUID( entityVersion );
+            builder.addUUID( entityUuid );
+            builder.addString(entityType );
          */
 
         String comparator = "UTF8Type";
@@ -418,7 +398,49 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
             }else if(count ==1){
                 stuff.add(new UUID(data.getLong(), data.getLong()));
             }else{
-                stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED));
+                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+            }
+
+            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+            count++;
+        }
+
+        return stuff;
+
+    }
+
+    @Override
+    protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){
+
+
+        /**
+         *  List<Object> keys = new ArrayList<>(4);
+            keys.add(fieldEntry.getVersion());
+            keys.add(fieldEntry.getField().getName());
+            keys.add(fieldValueString);
+            keys.add(fieldEntry.getField().getTypeName().name());
+         */
+
+        List<Object> stuff = new ArrayList<>();
+        int count = 0;
+        while(bb.hasRemaining()){
+
+            // the comparator info is different for the UUID reversed type vs. UTF8 type
+            if(count ==0){
+                bb.getShort(); // take the reversed comparator byte off
+            }else {
+                ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
+            }
+
+            ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+            // first composite is a UUID, rest are strings
+            if(count == 0) {
+                stuff.add(new UUID(data.getLong(), data.getLong()));
+            }else{
+                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
             }
 
             byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
@@ -465,15 +487,15 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
 
         final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
 
-//        final CollectionPrefixedKey<Field> uniquePrefixedKey =
-//            new CollectionPrefixedKey<>( collectionName, applicationId, field );
-
-//        //read back the id
-//        final Id orgId = ID_SER.fromComposite( parser );
-//        final Id scopeId = ID_SER.fromComposite( parser );
-//        final String scopeName = parser.readString();
-//        final K value = keySerializer.fromComposite( parser );
+        /**
+            final CollectionPrefixedKey<Field> uniquePrefixedKey =
+                new CollectionPrefixedKey<>( collectionName, applicationId, field );
 
+            final Id orgId = ID_SER.fromComposite( parser );
+            final Id scopeId = ID_SER.fromComposite( parser );
+            final String scopeName = parser.readString();
+            final K value = keySerializer.fromComposite( parser );
+        **/
 
         // values are serialized as strings, not sure why, and always lower cased
         String fieldValueString = fieldValue.toString().toLowerCase();
@@ -521,10 +543,11 @@ public class UniqueValueSerializationStrategyV1Impl  extends UniqueValueSerializ
 
 
        final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
-//
-//
-//        final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
-//            new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
+
+      /**
+            final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
+                new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
+       **/
 
         List<Object> keys = new ArrayList<>(4);
         keys.add(appUUID);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
index 4177c37..3e4932a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
@@ -40,7 +40,6 @@ import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -171,30 +170,18 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
     }
 
 
-
-    @Override
-    protected TypeField createUniqueValueKey( final Id applicationId,  final String type, final Field field) {
-        return new TypeField(type,field);
-    }
-
-
-    @Override
-    protected Field parseRowKey( final ScopedRowKey<TypeField> rowKey ) {
-        return rowKey.getKey().getField();
-    }
-
     @Override
     protected List<Object> deserializePartitionKey(ByteBuffer bb){
 
 
         /**
-         *  List<Object> keys = new ArrayList<>(6);
-         keys.add(0, appUUID); // UUID
-         keys.add(1, applicationType); // String
-         keys.add(2, entityType); // String
-         keys.add(3, fieldType); // String
-         keys.add(4, fieldName); // String
-         keys.add(5, fieldValueString); // String
+         *   List<Object> keys = new ArrayList<>(6);
+             keys.add(0, appUUID); // UUID
+             keys.add(1, applicationType); // String
+             keys.add(2, entityType); // String
+             keys.add(3, fieldType); // String
+             keys.add(4, fieldName); // String
+             keys.add(5, fieldValueString); // String
 
          */
 
@@ -215,23 +202,23 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
     }
 
     @Override
-    protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+    protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
 
         /**
-         *         final UUID version = value.getVersion();
-         final Field<?> field = value.getField();
+         *   final UUID version = value.getVersion();
+             final Field<?> field = value.getField();
 
-         final FieldTypeName fieldType = field.getTypeName();
-         final String fieldValue = field.getValue().toString().toLowerCase();
+             final FieldTypeName fieldType = field.getTypeName();
+             final String fieldValue = field.getValue().toString().toLowerCase();
 
 
-         DynamicComposite composite = new DynamicComposite(  );
+             DynamicComposite composite = new DynamicComposite(  );
 
-         //we want to sort ascending to descending by version
-         composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
-         composite.addComponent( field.getName(), STRING_SERIALIZER );
-         composite.addComponent( fieldValue, STRING_SERIALIZER );
-         composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+             //we want to sort ascending to descending by version
+             composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+             composite.addComponent( field.getName(), STRING_SERIALIZER );
+             composite.addComponent( fieldValue, STRING_SERIALIZER );
+             composite.addComponent( fieldType.name() , STRING_SERIALIZER);
          */
 
         // values are serialized as strings, not sure why, and always lower cased
@@ -250,7 +237,7 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
             fieldEntry.getField().getTypeName().name().length();
 
         // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
-        size += keys.size()*65;
+        size += keys.size()*5;
 
         // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
         size += keys.size()*comparator.length();
@@ -322,15 +309,15 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
     protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
 
         /**
-         *         final Id entityId = ev.getEntityId();
-         final UUID entityUuid = entityId.getUuid();
-         final String entityType = entityId.getType();
+         *   final Id entityId = ev.getEntityId();
+             final UUID entityUuid = entityId.getUuid();
+             final String entityType = entityId.getType();
 
-         CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+             CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
 
-         builder.addUUID( entityVersion );
-         builder.addUUID( entityUuid );
-         builder.addString(entityType );
+             builder.addUUID( entityVersion );
+             builder.addUUID( entityUuid );
+             builder.addString(entityType );
          */
 
         String comparator = "UTF8Type";
@@ -403,7 +390,49 @@ public class UniqueValueSerializationStrategyV2Impl  extends UniqueValueSerializ
             }else if(count ==1){
                 stuff.add(new UUID(data.getLong(), data.getLong()));
             }else{
-                stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED));
+                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+            }
+
+            byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+            count++;
+        }
+
+        return stuff;
+
+    }
+
+    @Override
+    protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){
+
+
+        /**
+         *   List<Object> keys = new ArrayList<>(4);
+             keys.add(fieldEntry.getVersion());
+             keys.add(fieldEntry.getField().getName());
+             keys.add(fieldValueString);
+             keys.add(fieldEntry.getField().getTypeName().name());
+         */
+
+        List<Object> stuff = new ArrayList<>();
+        int count = 0;
+        while(bb.hasRemaining()){
+
+            // the comparator info is different for the UUID reversed type vs. UTF8 type
+            if(count ==0){
+                bb.getShort(); // take the reversed comparator byte off
+            }else {
+                ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
+            }
+
+            ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+            // first composite is a UUID, rest are strings
+            if(count == 0) {
+                stuff.add(new UUID(data.getLong(), data.getLong()));
+            }else{
+                stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
             }
 
             byte equality = bb.get(); // we don't use this but take the equality byte off the buffer

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
index 8dd9528..853913b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
@@ -37,7 +37,11 @@ public class UniqueValueSetImpl implements UniqueValueSet {
 
 
     public void addValue(UniqueValue value){
-        values.put( value.getField().getName(), value );
+        values.putIfAbsent( value.getField().getName(), value );
+        // ^^ putIfAbsent important here as CQL returns column values differently than Asytanax/thrift due to CQL not
+        //    having a 'column range' for each row slice and all columns are returned. We don't want to overwrite the
+        //    first column values retrieved
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
index 6a705e4..148cc09 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
@@ -21,7 +21,12 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,8 +58,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
-
 @UseModules( TestCollectionModule.class )
 public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
 
@@ -110,6 +113,7 @@ public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
         when( scope.getApplication() )
             .thenReturn( new SimpleId( UUIDGenerator.newTimeUUID(), "organization" ) );
 
+        final Session session = mock(Session.class);
 
         // there is an entity
         final Entity entity = generateEntity();
@@ -135,16 +139,13 @@ public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
         UniqueValueSerializationStrategy uvstrat = mock( UniqueValueSerializationStrategy.class);
         UniqueValue uv1 = new UniqueValueImpl(entity.getField("name"), entity.getId(), entity.getVersion());
         UniqueValue uv2 = new UniqueValueImpl(  entity.getField("identifier"), entity.getId(), entity.getVersion());
-        MutationBatch mb = mock( MutationBatch.class );
-        when( uvstrat.delete(scope, uv1) ).thenReturn(mb);
-        when( uvstrat.delete(scope, uv2) ).thenReturn(mb);
 
         // Run the stage, conflict should be detected
         final MvccEntity mvccEntity = fromEntity( entity );
         boolean conflictDetected = false;
 
         WriteOptimisticVerify newStage = new WriteOptimisticVerify( mvccLog );
-        RollbackAction rollbackAction = new RollbackAction( mvccLog, uvstrat );
+        RollbackAction rollbackAction = new RollbackAction( mvccLog, uvstrat, session );
 
         try {
             newStage.call( new CollectionIoEvent<>(scope, mvccEntity));
@@ -157,8 +158,8 @@ public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
         assertTrue( conflictDetected );
 
         // check that unique values were deleted
-        verify( uvstrat, times(1) ).delete(scope,  uv1 );
-        verify( uvstrat, times(1) ).delete(scope,  uv2 );
+        verify( uvstrat, times(1) ).deleteCQL(scope,  uv1 );
+        verify( uvstrat, times(1) ).deleteCQL(scope,  uv2 );
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/866d11bf/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
index 3ffdb65..185cfb7 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
@@ -192,7 +192,10 @@ public abstract class UniqueValueSerializationStrategyImplTest {
         BatchStatement batch = strategy.writeCQL( scope, stored, -1);
         session.execute(batch);
 
-        strategy.delete( scope, stored ).execute();
+
+        //strategy.delete( scope, stored ).execute();
+        BatchStatement deleteBatch = strategy.deleteCQL(scope, stored);
+        session.execute(deleteBatch);
 
         UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) );
 


Mime
View raw message