usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [40/50] incubator-usergrid git commit: add replay strategy
Date Mon, 16 Mar 2015 23:38:08 GMT
add replay strategy


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a1b557d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a1b557d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a1b557d0

Branch: refs/heads/two-dot-o
Commit: a1b557d073fd8dbe4c20f949ce7497db01d7cea5
Parents: fde59b7
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Fri Mar 13 11:25:52 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Fri Mar 13 11:25:52 2015 -0600

----------------------------------------------------------------------
 .../mvcc/stage/write/WriteUniqueVerify.java     | 167 ++++++++++++-------
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   6 +-
 .../persistence/core/astyanax/CassandraFig.java |   4 +-
 3 files changed, 111 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b557d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 1c30e75..5f96b92 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -25,7 +25,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.hystrix.Hystrix;
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixThreadPoolProperties;
 import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,12 +73,14 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
     protected final SerializationFig serializationFig;
 
     protected final Keyspace keyspace;
+    private final CassandraConfig cassandraFig;
 
 
     @Inject
     public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
-                              final SerializationFig serializationFig, final Keyspace keyspace
) {
+                              final SerializationFig serializationFig, final Keyspace keyspace,
final CassandraConfig cassandraFig ) {
         this.keyspace = keyspace;
+        this.cassandraFig = cassandraFig;
 
         Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy
is required" );
         Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
@@ -88,100 +97,134 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
         final MvccEntity mvccEntity = ioevent.getEvent();
 
-        final Id entityId = mvccEntity.getId();
-
-        final UUID entityVersion = mvccEntity.getVersion();
-
-
         final Entity entity = mvccEntity.getEntity().get();
 
-
         final CollectionScope scope = ioevent.getEntityCollection();
 
         // use simple thread pool to verify fields in parallel
+        ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,keyspace,serializationFig,cassandraFig,scope,entity);
+        Map<String,Field>  uniquenessViolations = cmd.execute();
+        //We have violations, throw an exception
+        if ( !uniquenessViolations.isEmpty() ) {
+            throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(),
uniquenessViolations );
+        }
+    }
 
-        final Collection<Field> entityFields = entity.getFields();
+    private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{
+
+        private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+        private final Keyspace keySpace;
+        private final SerializationFig serializationFig;
+        private final CassandraConfig fig;
+        private final CollectionScope scope;
+        private final Entity entity;
+
+        public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy,Keyspace
keySpace, SerializationFig serializationFig, CassandraConfig fig,CollectionScope scope, Entity
entity){
+            super(REPLAY_GROUP);
+            this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+            this.keySpace = keySpace;
+            this.serializationFig = serializationFig;
+            this.fig = fig;
+            this.scope = scope;
+            this.entity = entity;
+        }
 
-        //allocate our max size, worst case
-        final List<Field> uniqueFields = new ArrayList<>( entityFields.size()
);
+        @Override
+        protected Map<String, Field> run() throws Exception {
+            return executeStrategy(fig.getReadCL());
+        }
 
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-        //
-        // Construct all the functions for verifying we're unique
-        //
-        for ( final Field field : entityFields ) {
+        @Override
+        protected Map<String, Field> getFallback() {
+            return executeStrategy(fig.getConsistentReadCL());
+        }
 
-            // if it's unique, create a function to validate it and add it to the list of
-            // concurrent validations
-            if ( field.isUnique() ) {
+        public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
+            Collection<Field> entityFields = entity.getFields();
+            //allocate our max size, worst case
+            final List<Field> uniqueFields = new ArrayList<Field>( entityFields.size()
);
+            //now get the set of fields back
+            final UniqueValueSet uniqueValues;
+            //todo add consistencylevel and read back if fail using
 
+            try {
 
-                // use write-first then read strategy
-                final UniqueValue written = new UniqueValueImpl( field, entityId, entityVersion
);
+                uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel,
uniqueFields );
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to read from cassandra", e );
+            }
 
-                // use TTL in case something goes wrong before entity is finally committed
-                final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout()
);
 
-                batch.mergeShallow( mb );
+            final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size()
);
 
 
-                uniqueFields.add( field );
-            }
-        }
+            //loop through each field that was unique
+            for ( final Field field : uniqueFields ) {
 
-        //short circuit nothing to do
-        if ( uniqueFields.size() == 0 ) {
-            return;
-        }
+                final UniqueValue uniqueValue = uniqueValues.getValue( field.getName() );
 
+                if ( uniqueValue == null ) {
+                    throw new RuntimeException(
+                        String.format( "Could not retrieve unique value for field %s, unable
to verify",
+                            field.getName() ) );
+                }
 
-        //perform the write
-        try {
-            batch.execute();
-        }
-        catch ( ConnectionException ex ) {
-            throw new RuntimeException( "Unable to write to cassandra", ex );
-        }
 
+                final Id returnedEntityId = uniqueValue.getEntityId();
 
-        //now get the set of fields back
-        final UniqueValueSet uniqueValues;
 
-        try {
-            uniqueValues = uniqueValueStrat.load( scope, uniqueFields );
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to read from cassandra", e );
-        }
+                if ( !entity.getId().equals(returnedEntityId) ) {
+                    uniquenessViolations.put( field.getName(), field );
+                }
+            }
+            final MutationBatch batch = keySpace.prepareMutationBatch();
+            //
+            // Construct all the functions for verifying we're unique
+            //
+            for ( final Field field :  entity.getFields() ) {
 
+                // if it's unique, create a function to validate it and add it to the list
of
+                // concurrent validations
+                if ( field.isUnique() ) {
 
-        final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size()
);
 
+                    // use write-first then read strategy
+                    final UniqueValue written = new UniqueValueImpl( field, entity.getId(),
entity.getVersion() );
 
-        //loop through each field that was unique
-        for ( final Field field : uniqueFields ) {
+                    // use TTL in case something goes wrong before entity is finally committed
+                    final MutationBatch mb = uniqueValueSerializationStrategy.write( scope,
written, serializationFig.getTimeout() );
 
-            final UniqueValue uniqueValue = uniqueValues.getValue( field.getName() );
+                    batch.mergeShallow( mb );
 
-            if ( uniqueValue == null ) {
-                throw new RuntimeException(
-                        String.format( "Could not retrieve unique value for field %s, unable
to verify",
-                                field.getName() ) );
-            }
 
+                    uniqueFields.add(field);
+                }
+            }
 
-            final Id returnedEntityId = uniqueValue.getEntityId();
+            //short circuit nothing to do
+            if ( uniqueFields.size() == 0 ) {
+                return uniquenessViolations ;
+            }
 
 
-            if ( !entityId.equals( returnedEntityId ) ) {
-                uniquenessViolations.put( field.getName(), field );
+            //perform the write
+            try {
+                batch.execute();
+            }
+            catch ( ConnectionException ex ) {
+                throw new RuntimeException( "Unable to write to cassandra", ex );
             }
-        }
-
 
-        //We have violations, throw an exception
-        if ( !uniquenessViolations.isEmpty() ) {
-            throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(),
uniquenessViolations );
+            return uniquenessViolations;
         }
     }
+
+    /**
+     * Command group used for realtime user commands
+     */
+    public static final HystrixCommand.Setter
+        REPLAY_GROUP = HystrixCommand.Setter.withGroupKey(
+            HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
+                HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b557d0/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 4ebfd24..51cb198 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -18,6 +18,7 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.junit.Rule;
 import org.junit.Test;
@@ -64,7 +65,8 @@ public class WriteUniqueVerifyTest {
     @Inject
     private SerializationFig fig;
 
-
+    @Inject
+    private CassandraConfig cassandraConfig;
 
 
     @Test
@@ -81,7 +83,7 @@ public class WriteUniqueVerifyTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig
);
 
        newStage.call(
             new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a1b557d0/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 66a716c..a907702 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -79,11 +79,11 @@ public interface CassandraFig extends GuicyFig {
     @Default( "false" )
     boolean isEmbedded();
 
-    @Default("CL_ONE")
+    @Default("CL_LOCAL_ONE")
     @Key(READ_CL)
     String getReadCL();
 
-    @Default("CL_LOCAL_ONE")
+    @Default("CL_LOCAL_QUORUM")
     @Key(READ_CONSISTENT_CL)
     String getConsistentReadCL();
 


Mime
View raw message