usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [1/2] usergrid git commit: WIP overwrite
Date Wed, 09 Sep 2015 23:22:24 GMT
Repository: usergrid
Updated Branches:
  refs/heads/USERGRID-909 0e67f19a6 -> dc3e9d198


http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/guice/LockModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/guice/LockModule.java b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/guice/LockModule.java
index a195612..b1e8226 100644
--- a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/guice/LockModule.java
+++ b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/guice/LockModule.java
@@ -26,8 +26,8 @@ import org.apache.usergrid.persistence.locks.impl.CassandraLockFig;
 import org.apache.usergrid.persistence.locks.impl.CassandraLockManager;
 import org.apache.usergrid.persistence.locks.impl.LockConsistency;
 import org.apache.usergrid.persistence.locks.impl.LockConsistencyImpl;
-import org.apache.usergrid.persistence.locks.impl.NodeShardProposalSerialization;
-import org.apache.usergrid.persistence.locks.impl.NodeShardProposalSerializationImpl;
+import org.apache.usergrid.persistence.locks.impl.LockProposalSerialization;
+import org.apache.usergrid.persistence.locks.impl.LockProposalSerializationImpl;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Key;
@@ -50,10 +50,10 @@ public class LockModule extends AbstractModule {
 
         bind( LockConsistency.class ).to( LockConsistencyImpl.class );
 
-        bind( NodeShardProposalSerialization.class ).to( NodeShardProposalSerializationImpl.class );
+        bind( LockProposalSerialization.class ).to( LockProposalSerializationImpl.class );
 
         Multibinder<Migration> migrationBinder = Multibinder.newSetBinder( binder(), Migration.class );
         //entity serialization versions
-        migrationBinder.addBinding().to( Key.get( NodeShardProposalSerialization.class ) );
+        migrationBinder.addBinding().to( Key.get( LockProposalSerialization.class ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockFig.java b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockFig.java
index 4583dad..d1bfb44 100644
--- a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockFig.java
+++ b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockFig.java
@@ -32,6 +32,11 @@ public interface CassandraLockFig extends GuicyFig {
     String WRITE_CONSISTENT_CL = "usergrid.lock.multiregion.write.cl";
 
 
+    String POLL_WAIT = "usergrid.lock.poll.wait";
+
+    String POLL_COUNT = "usergrid.lock.poll.count";
+
+
     /**
      * Get multi region write consistency level
      * @return
@@ -46,4 +51,12 @@ public interface CassandraLockFig extends GuicyFig {
     @Default( "CL_LOCAL_QUORUM" )
     @Key( READ_CONSISTENT_CL )
     String getMultiRegionLockReadConsistency();
+
+    @Default("50")
+    @Key( POLL_WAIT )
+    long getLockPollWait();
+
+    @Default( "2" )
+    @Key( POLL_COUNT )
+    int getPollCount();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockManager.java b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockManager.java
index 243ea74..09b03a1 100644
--- a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockManager.java
+++ b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraLockManager.java
@@ -33,12 +33,16 @@ import com.google.inject.Singleton;
 @Singleton
 public class CassandraLockManager implements LockManager {
 
-    private final NodeShardProposalSerialization nodeShardProposalSerialization;
+    private final CassandraLockFig cassandraLockFig;
+    private final LockProposalSerialization lockProposalSerialization;
 
 
     @Inject
-    public CassandraLockManager( final NodeShardProposalSerialization nodeShardProposalSerialization ) {
-        this.nodeShardProposalSerialization = nodeShardProposalSerialization;
+    public CassandraLockManager( final CassandraLockFig cassandraLockFig, final LockProposalSerialization lockProposalSerialization ) {
+
+
+        this.cassandraLockFig = cassandraLockFig;
+        this.lockProposalSerialization = lockProposalSerialization;
     }
 
 
@@ -46,7 +50,7 @@ public class CassandraLockManager implements LockManager {
     public Lock createMultiRegionLock( final LockId key ) {
         Preconditions.checkNotNull(key, "Key is required");
 
-        return new CassandraMultiRegionLock(key, nodeShardProposalSerialization);
+        return new CassandraMultiRegionLock( cassandraLockFig, key, lockProposalSerialization );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraMultiRegionLock.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraMultiRegionLock.java b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraMultiRegionLock.java
index 62c157e..1565af8 100644
--- a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraMultiRegionLock.java
+++ b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/CassandraMultiRegionLock.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.locks.Lock;
 import org.apache.usergrid.persistence.locks.LockId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 
@@ -34,14 +35,16 @@ import com.google.common.base.Preconditions;
 public class CassandraMultiRegionLock implements Lock {
 
     private final LockId lockId;
-    private final NodeShardProposalSerialization nodeShardProposalSerialization;
+    private final LockProposalSerialization lockProposalSerialization;
     private final UUID lockUUID;
+    private final CassandraLockFig cassandraLockFig;
 
 
-    public CassandraMultiRegionLock(final LockId lockId,
-                          final NodeShardProposalSerialization nodeShardProposalSerialization ) {
+    public CassandraMultiRegionLock( final CassandraLockFig cassandraLockFig, final LockId lockId,
+                                     final LockProposalSerialization lockProposalSerialization ) {
+        this.cassandraLockFig = cassandraLockFig;
         this.lockId = lockId;
-        this.nodeShardProposalSerialization = nodeShardProposalSerialization;
+        this.lockProposalSerialization = lockProposalSerialization;
         this.lockUUID = UUIDGenerator.newTimeUUID();
     }
 
@@ -49,25 +52,76 @@ public class CassandraMultiRegionLock implements Lock {
     @Override
     public boolean tryLock( final long timeToLive, final TimeUnit timeUnit ) {
 
-        final long expiration = timeUnit.toSeconds( timeToLive );
+        final long expirationLong = timeUnit.toSeconds( timeToLive );
 
-        Preconditions.checkArgument(expiration > Integer.MAX_VALUE, "Expiration cannot be longer than "  + Integer.MAX_VALUE);
+        Preconditions.checkArgument( expirationLong <= Integer.MAX_VALUE,
+            "Expiration cannot be longer than " + Integer.MAX_VALUE );
 
-        final LockCandidate lockCandidate = this.nodeShardProposalSerialization.writeNewValue( lockId, lockUUID, ( int ) expiration );
+        final int expiration = ( int ) expirationLong;
+
+        final LockCandidate lockCandidate =
+            this.lockProposalSerialization.writeNewValue( lockId, lockUUID, expiration );
 
         //now check if we need to ack our previous
 
+        if ( lockCandidate.isLocked( lockUUID ) ) {
+            return true;
+        }
+
+
+        final Optional<UUID> uuidToAck = lockCandidate.getValueToAck( lockUUID );
+
+        if ( uuidToAck.isPresent() ) {
+            this.lockProposalSerialization.ackProposed( lockId, lockUUID, uuidToAck.get(), expiration );
+        }
 
 
+        //we should poll to see if we can get the lock
+        if(lockCandidate.shouldPoll( lockUUID ) && pollForLock()){
+            return true;
+        }
 
 
+        //we don't have a lock, delete our candidate
+        unlock();
 
         return false;
     }
 
 
+    /**
+     * Sleep then poll the lock, if we get it, return true, otherwise short circuit
+     * @return
+     */
+    private boolean pollForLock(){
+        for(int i = 0; i < cassandraLockFig.getPollCount(); i ++){
+
+            try {
+                Thread.sleep( cassandraLockFig.getLockPollWait() );
+            }
+            catch ( InterruptedException e ) {
+                throw new RuntimeException( "Unable to sleep on poll" );
+            }
+
+            final LockCandidate lockState = this.lockProposalSerialization.pollState( lockId );
+
+            //done, short cuircuit
+            if(lockState.isLocked( lockUUID )){
+                return true;
+            }
+
+            //otherwise loop again and wait
+
+        }
+
+        return false;
+    }
+
     @Override
     public void unlock() {
-
+        //unlock and delete
+        this.lockProposalSerialization.delete( lockId, lockUUID );
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockCandidate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockCandidate.java b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockCandidate.java
index 1d90552..5ced3f6 100644
--- a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockCandidate.java
+++ b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockCandidate.java
@@ -44,28 +44,34 @@ public class LockCandidate {
 
     /**
      * Return true if the proposedUuid is the first UUID
-     * @param proposedUuid
-     * @return
      */
-    public boolean isFirst(final UUID proposedUuid){
-        return UUIDComparator.staticCompare( first, proposedUuid ) ==0 ;
+    public boolean isFirst( final UUID proposedUuid ) {
+        return UUIDComparator.staticCompare( first, proposedUuid ) == 0;
     }
 
 
     /**
      * Return trus if the proposedUuid is the
-     * @param proposedUuid
-     * @return
      */
-    public boolean isLocked(final UUID proposedUuid){
-        if(!isFirst( proposedUuid )) {
+    public boolean isLocked( final UUID proposedUuid ) {
+        /**
+         * The first uuid in the list, we might have the lock.  Proceed with further checks, otherwise short circuit
+         */
+        if ( !isFirst( proposedUuid ) ) {
             return false;
         }
 
 
+        /**
+         *  Check if we need to poll, if so, we can't have the lock, we need to check to ensure the send proposal has seen us
+         */
+        if ( shouldPoll( proposedUuid ) ) {
+            return false;
+        }
+
         //we have an "acked" value from a previous time uuid.  Compare them and be sure the
         //second has recognized the first as the being first, otherwise we can't proceed.
-        if(secondAcked.isPresent()) {
+        if ( secondAcked.isPresent() ) {
             return UUIDComparator.staticCompare( first, secondAcked.get() ) == 0;
         }
 
@@ -75,27 +81,37 @@ public class LockCandidate {
 
 
     /**
+     * Return true if we may have a lock, but just need to be acknowldeged
+     */
+    public boolean shouldPoll( final UUID proposedUuid ) {
+
+        //we have a second, but it hasn't acknowledged the first (us), we can't lock
+        if ( isFirst( proposedUuid ) && second.isPresent() && !secondAcked.isPresent() ) {
+            return true;
+        }
+
+        return false;
+    }
+
+
+    /**
      * Optionally return the value to ack.  Will be absent if an ack is not required
-     * @return
      */
-    public Optional<UUID> getValueToAck(final UUID proposedUuid){
+    public Optional<UUID> getValueToAck( final UUID proposedUuid ) {
 
-         //no second, or the second isn't equal to our proposed
-        if(!second.isPresent() || UUIDComparator.staticCompare( proposedUuid, second.get()) != 0){
+        //no second, or the second isn't equal to our proposed
+        if ( !second.isPresent() || UUIDComparator.staticCompare( proposedUuid, second.get() ) != 0 ) {
             return Optional.absent();
         }
 
         /**
          * If our second has already been acked, it's absent
          */
-        if(secondAcked.isPresent()){
+        if ( secondAcked.isPresent() ) {
             return Optional.absent();
         }
 
         //send back the first to ack
         return Optional.of( first );
-
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerialization.java b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerialization.java
new file mode 100644
index 0000000..f607c51
--- /dev/null
+++ b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerialization.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.locks.impl;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.locks.LockId;
+
+
+/**
+ * Interface for serializing node shard proposals
+ */
+public interface LockProposalSerialization extends Migration {
+
+
+    /**
+     * Propose a new shard and ack shards that are before us
+     * @param lockId The key for the locks
+     * @param proposed The proposed time uuid key
+     * @param expirationInSeconds The time to allow the proposal to live.
+     *
+     * @return The Proposal of the 2 items in the proposal queue
+     */
+    LockCandidate writeNewValue( final LockId lockId, final UUID proposed, final int expirationInSeconds );
+
+
+    /**
+     * Ack the proposal and re-read
+     * @param lockId
+     * @param proposed The proposed uuid we set
+     * @param seen The uuid to set into the seen value
+     * @param expirationInSeconds The time to allow the proposal to live.
+     * @return
+     */
+    LockCandidate ackProposed(final LockId lockId, final UUID proposed, final UUID seen, final int expirationInSeconds);
+
+    /**
+     * Poll the state of the current lock
+     * @param lockId
+     * @return
+     */
+    LockCandidate pollState(final LockId lockId);
+
+    /**
+     * Remove all the proposals
+     * @param lockId The key for the locks
+     * @param proposed The proposed value
+     */
+    void delete(  final LockId lockId, final UUID proposed );
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerializationImpl.java b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerializationImpl.java
new file mode 100644
index 0000000..74dfa34
--- /dev/null
+++ b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerializationImpl.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.locks.impl;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UUIDType;
+
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.astyanax.StringRowCompositeSerializer;
+import org.apache.usergrid.persistence.locks.LockId;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnList;
+import com.netflix.astyanax.serializers.UUIDSerializer;
+
+
+/**
+ * Serialization of locks
+ */
+@Singleton
+public class LockProposalSerializationImpl implements LockProposalSerialization {
+
+    private static final StringRowCompositeSerializer STRING_SER = StringRowCompositeSerializer.get();
+
+    private static final ScopedRowKeySerializer<String> ROW_KEY_SER = new ScopedRowKeySerializer<>( STRING_SER );
+
+
+    private static final MultiTennantColumnFamily<ScopedRowKey<String>, UUID> CF_MULTI_REGION_LOCKS =
+        new MultiTennantColumnFamily<>( "Multi_Region_Locks", ROW_KEY_SER, UUIDSerializer.get() );
+
+
+    private static final byte[] EMPTY = new byte[0];
+
+
+    protected final Keyspace keyspace;
+    protected final LockConsistency lockConsistency;
+
+
+    @Inject
+    public LockProposalSerializationImpl( final Keyspace keyspace, final LockConsistency lockConsistency ) {
+        this.keyspace = keyspace;
+        this.lockConsistency = lockConsistency;
+    }
+
+
+    @Override
+    public LockCandidate writeNewValue( final LockId lockId, final UUID proposed, final int expirationInSeconds ) {
+
+        final MutationBatch batch =
+            keyspace.prepareMutationBatch().withConsistencyLevel( lockConsistency.getShardWriteConsistency() );
+
+        final Id applicationId = lockId.getApplicationScope().getApplication();
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( applicationId, lockId.generateKey() );
+
+        //put the column with expiration
+        batch.withRow( CF_MULTI_REGION_LOCKS, rowKey ).putColumn( proposed, EMPTY, expirationInSeconds );
+
+
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
+        }
+
+
+        return readState( rowKey );
+    }
+
+
+    @Override
+    public LockCandidate ackProposed( final LockId lockId, final UUID proposed, final UUID seen,
+                                      final int expirationInSeconds ) {
+
+        final MutationBatch batch =
+            keyspace.prepareMutationBatch().withConsistencyLevel( lockConsistency.getShardWriteConsistency() );
+
+        final Id applicationId = lockId.getApplicationScope().getApplication();
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( applicationId, lockId.generateKey() );
+
+
+        //put the column with expiration
+        batch.withRow( CF_MULTI_REGION_LOCKS, rowKey ).putColumn( proposed, seen, expirationInSeconds );
+
+
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
+        }
+
+
+        return readState( rowKey );
+    }
+
+
+    @Override
+    public LockCandidate pollState( final LockId lockId ) {
+
+        final Id applicationId = lockId.getApplicationScope().getApplication();
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( applicationId, lockId.generateKey() );
+
+        return readState( rowKey );
+    }
+
+
+    /**
+     * Read the lock state from the column family
+     */
+    private LockCandidate readState( final ScopedRowKey<String> rowKey ) {
+        //read the first 2 records
+
+
+        final ColumnList<UUID> results;
+
+        try {
+            results = keyspace.prepareQuery( CF_MULTI_REGION_LOCKS )
+                              .setConsistencyLevel( lockConsistency.getShardReadConsistency() ).getKey( rowKey )
+                              .withColumnRange( ( UUID ) null, null, false, 2 ).execute().getResult();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
+        }
+
+
+        //should never happen, sanity check.
+        if ( results.isEmpty() ) {
+            throw new RuntimeException(
+                "Unable to read results from cassandra.  There should be at least 1 result left" );
+        }
+
+
+        final UUID proposedLock = results.getColumnByIndex( 0 ).getName();
+
+        //we have 2 columns, populate the proposal
+        if ( results.size() == 2 ) {
+            final Column<UUID> column = results.getColumnByIndex( 1 );
+
+            final Optional<UUID> secondProposedLock = Optional.of( column.getName() );
+
+            final Optional<UUID> valueSeenBySecond;
+
+            if ( column.hasValue() ) {
+                valueSeenBySecond = Optional.of( column.getUUIDValue() );
+            }
+            else {
+                valueSeenBySecond = Optional.absent();
+            }
+
+            return new LockCandidate( proposedLock, secondProposedLock, valueSeenBySecond );
+        }
+
+        return new LockCandidate( proposedLock, Optional.absent(), Optional.absent() );
+    }
+
+
+    @Override
+    public void delete( final LockId lockId, final UUID proposed ) {
+        final MutationBatch batch =
+            keyspace.prepareMutationBatch().withConsistencyLevel( lockConsistency.getShardWriteConsistency() );
+
+        final Id applicationId = lockId.getApplicationScope().getApplication();
+
+        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( applicationId, lockId.generateKey() );
+
+
+        //put the column with expiration
+        batch.withRow( CF_MULTI_REGION_LOCKS, rowKey ).deleteColumn( proposed );
+
+
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
+        }
+    }
+
+
+    @Override
+    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+        //create the CF and sort them by uuid type so time uuid with lowest will be first
+        MultiTennantColumnFamilyDefinition cf =
+            new MultiTennantColumnFamilyDefinition( CF_MULTI_REGION_LOCKS, BytesType.class.getSimpleName(),
+                UUIDType.class.getSimpleName(), UUIDType.class.getSimpleName(),
+                MultiTennantColumnFamilyDefinition.CacheOption.ALL, Optional.of( 1 ) );
+
+
+        return Collections.singleton( cf );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerialization.java b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerialization.java
deleted file mode 100644
index 11314b2..0000000
--- a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerialization.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.persistence.locks.impl;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.migration.schema.Migration;
-import org.apache.usergrid.persistence.locks.LockId;
-
-
-/**
- * Interface for serializing node shard proposals
- */
-public interface NodeShardProposalSerialization extends Migration {
-
-
-    /**
-     * Propose a new shard and ack shards that are before us
-     * @param lockId The key for the locks
-     * @param proposed The proposed time uuid key
-     * @param expirationInSeconds The time to allow the proposal to live.
-     *
-     * @return The Proposal of the 2 items in the proposal queue
-     */
-    LockCandidate writeNewValue( final LockId lockId, final UUID proposed, final int expirationInSeconds );
-
-
-    /**
-     * Ack the proposal and re-read
-     * @param lockId
-     * @param proposed The proposed uuid we set
-     * @param seen The uuid to set into the seen value
-     * @param expirationInSeconds The time to allow the proposal to live.
-     * @return
-     */
-    LockCandidate ackProposed(final LockId lockId, final UUID proposed, final UUID seen, final int expirationInSeconds);
-
-    /**
-     * Remove all the proposals
-     * @param lockId The key for the locks
-     * @param proposed The proposed value
-     */
-    void delete(  final LockId lockId, final UUID proposed );
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerializationImpl.java b/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerializationImpl.java
deleted file mode 100644
index ceb60d6..0000000
--- a/stack/corepersistence/locks/src/main/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerializationImpl.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.persistence.locks.impl;
-
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.UUIDType;
-
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.StringRowCompositeSerializer;
-import org.apache.usergrid.persistence.locks.LockId;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.ColumnList;
-import com.netflix.astyanax.serializers.UUIDSerializer;
-
-
-/**
- * Serialization of locks
- */
-@Singleton
-public class NodeShardProposalSerializationImpl implements NodeShardProposalSerialization {
-
-    private static final StringRowCompositeSerializer STRING_SER = StringRowCompositeSerializer.get();
-
-    private static final ScopedRowKeySerializer<String> ROW_KEY_SER = new ScopedRowKeySerializer<>( STRING_SER );
-
-
-    private static final MultiTennantColumnFamily<ScopedRowKey<String>, UUID> CF_MULTI_REGION_LOCKS =
-        new MultiTennantColumnFamily<>( "Multi_Region_Locks", ROW_KEY_SER, UUIDSerializer.get() );
-
-
-    private static final byte[] EMPTY = new byte[0];
-
-
-    protected final Keyspace keyspace;
-    protected final LockConsistency lockConsistency;
-
-
-    @Inject
-    public NodeShardProposalSerializationImpl( final Keyspace keyspace, final LockConsistency lockConsistency ) {
-        this.keyspace = keyspace;
-        this.lockConsistency = lockConsistency;
-    }
-
-
-    @Override
-    public LockCandidate writeNewValue( final LockId lockId, final UUID proposed, final int expirationInSeconds ) {
-
-        final MutationBatch batch =
-            keyspace.prepareMutationBatch().withConsistencyLevel( lockConsistency.getShardWriteConsistency() );
-
-        final Id applicationId = lockId.getApplicationScope().getApplication();
-
-        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( applicationId, lockId.generateKey() );
-
-        //put the column with expiration
-        batch.withRow( CF_MULTI_REGION_LOCKS, rowKey ).putColumn( proposed, EMPTY, expirationInSeconds );
-
-
-        try {
-            batch.execute();
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to connect to cassandra", e );
-        }
-
-
-        return readState( rowKey );
-    }
-
-
-    @Override
-    public LockCandidate ackProposed( final LockId lockId, final UUID proposed, final UUID seen,
-                                      final int expirationInSeconds ) {
-
-        final MutationBatch batch =
-            keyspace.prepareMutationBatch().withConsistencyLevel( lockConsistency.getShardWriteConsistency() );
-
-        final Id applicationId = lockId.getApplicationScope().getApplication();
-
-        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( applicationId, lockId.generateKey() );
-
-
-        //put the column with expiration
-        batch.withRow( CF_MULTI_REGION_LOCKS, rowKey ).putColumn( proposed, seen, expirationInSeconds );
-
-
-        try {
-            batch.execute();
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to connect to cassandra", e );
-        }
-
-
-        return readState( rowKey );
-    }
-
-
-    /**
-     * Read the lock state from the column family
-     */
-    private LockCandidate readState( final ScopedRowKey<String> rowKey ) {
-        //read the first 2 records
-
-
-        final ColumnList<UUID> results;
-
-        try {
-            results = keyspace.prepareQuery( CF_MULTI_REGION_LOCKS )
-                              .setConsistencyLevel( lockConsistency.getShardReadConsistency() )
-                              .getKey( rowKey )
-                              .withColumnRange( ( UUID ) null, null, false, 2 ).execute().getResult();
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to connect to cassandra", e );
-        }
-
-
-        //should never happen, sanity check.
-        if ( results.isEmpty() ) {
-            throw new RuntimeException(
-                "Unable to read results from cassandra.  There should be at least 1 result left" );
-        }
-
-
-        final UUID proposedLock = results.getColumnByIndex( 0 ).getName();
-
-        //we have 2 columns, populate the proposal
-        if ( results.size() == 2 ) {
-            final Column<UUID> column = results.getColumnByIndex( 1 );
-
-            final Optional<UUID> secondProposedLock = Optional.of( column.getName() );
-
-            final Optional<UUID> valueSeenBySecond;
-
-            if ( column.hasValue() ) {
-                valueSeenBySecond = Optional.of( column.getUUIDValue() );
-            }
-            else {
-                valueSeenBySecond = Optional.absent();
-            }
-
-            return new LockCandidate( proposedLock, secondProposedLock, valueSeenBySecond );
-        }
-
-        return new LockCandidate( proposedLock, Optional.absent(), Optional.absent() );
-    }
-
-
-    @Override
-    public void delete( final LockId lockId, final UUID proposed ) {
-        final MutationBatch batch =
-            keyspace.prepareMutationBatch().withConsistencyLevel( lockConsistency.getShardWriteConsistency() );
-
-        final Id applicationId = lockId.getApplicationScope().getApplication();
-
-        final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( applicationId, lockId.generateKey() );
-
-
-        //put the column with expiration
-        batch.withRow( CF_MULTI_REGION_LOCKS, rowKey ).deleteColumn( proposed );
-
-
-        try {
-            batch.execute();
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to connect to cassandra", e );
-        }
-    }
-
-
-    @Override
-    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
-        //create the CF and sort them by uuid type so time uuid with lowest will be first
-        MultiTennantColumnFamilyDefinition cf =
-            new MultiTennantColumnFamilyDefinition( CF_MULTI_REGION_LOCKS, BytesType.class.getSimpleName(),
-                UUIDType.class.getSimpleName(), UUIDType.class.getSimpleName(),
-                MultiTennantColumnFamilyDefinition.CacheOption.ALL, Optional.of( 1 ) );
-
-
-        return Collections.singleton( cf );
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/CassandraLockManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/CassandraLockManagerTest.java b/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/CassandraLockManagerTest.java
new file mode 100644
index 0000000..dd50b88
--- /dev/null
+++ b/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/CassandraLockManagerTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.locks.impl;
+
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.locks.Lock;
+import org.apache.usergrid.persistence.locks.LockId;
+import org.apache.usergrid.persistence.locks.guice.TestLockModule;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests the cassandra lock manager implementation
+ */
+@RunWith( ITRunner.class )
+@UseModules( { TestLockModule.class } )
+public class CassandraLockManagerTest {
+
+    private static final Logger logger = LoggerFactory.getLogger( CassandraLockManagerTest.class );
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected CassandraLockManager cassandraLockManager;
+
+
+    protected ApplicationScope scope;
+
+
+    private static final int ONE_HOUR_TTL = 1;
+
+    private static final TimeUnit HOURS = TimeUnit.HOURS;
+
+
+    @Before
+    public void setup() {
+        scope = mock( ApplicationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getApplication() ).thenReturn( orgId );
+    }
+
+
+    @Test
+    public void testConcurrency() {
+
+        final int numConcurrent = 10;
+
+        final LockId sharedLock = createLockId();
+
+        final CountDownLatch countDownLatch = new CountDownLatch( numConcurrent );
+
+
+        final LockResults lockResults = Observable.range( 0, numConcurrent ).flatMap( input -> {
+            return Observable.just( input ).map( intValue -> {
+
+                try {
+                    countDownLatch.countDown();
+                    countDownLatch.await();
+                }
+                catch ( InterruptedException e ) {
+                    throw new RuntimeException( "Unable to countdown latch" );
+                }
+
+                final Lock lockId = cassandraLockManager.createMultiRegionLock( sharedLock );
+
+                final LockTuple lockTuple = new LockTuple( intValue, lockId, lockId.tryLock( ONE_HOUR_TTL, HOURS ) );
+
+
+                logger.info( "Result of lock index {} is {}", lockTuple.indexId, lockTuple.hasLock );
+
+                return lockTuple;
+            } ).subscribeOn( Schedulers.io() );
+        }, numConcurrent ).collect( () -> new LockResults(), ( results, lockTuple ) -> results.addLock( lockTuple ) )
+                                                  .toBlocking().last();
+
+
+        assertEquals( "Only 1 lock should be present", 1, lockResults.lockedTuples.size() );
+
+
+        final int expectedSize = numConcurrent - 1;
+
+        assertEquals( "Only 1 lock present", expectedSize, lockResults.unlockedTuples.size() );
+
+
+        //now get the 1 lock and ensure it's current
+
+        final LockTuple tuple = lockResults.lockedTuples.iterator().next();
+
+        //lets unlock and ensure it works.
+
+        tuple.lock.unlock();
+    }
+
+
+    private static final class LockTuple {
+
+        private final int indexId;
+
+        private final Lock lock;
+
+        private final boolean hasLock;
+
+
+        private LockTuple( final int indexId, final Lock lock, final boolean hasLock ) {
+            this.indexId = indexId;
+            this.lock = lock;
+            this.hasLock = hasLock;
+        }
+    }
+
+
+    private static final class LockResults {
+
+        private final Set<LockTuple> unlockedTuples;
+
+        private final Set<LockTuple> lockedTuples;
+
+
+        private LockResults() {
+            unlockedTuples = new HashSet<>();
+            lockedTuples = new HashSet<>();
+        }
+
+
+        /**
+         * Add the lock to the tuple
+         */
+        public void addLock( final LockTuple lock ) {
+            if ( lock.hasLock ) {
+                lockedTuples.add( lock );
+            }
+            else {
+                unlockedTuples.add( lock );
+            }
+        }
+    }
+
+
+    @Test( expected = UnsupportedOperationException.class )
+    public void failLocalLocks() {
+
+        final LockId sharedLock = createLockId();
+
+
+        //should throw an unsupported operation exception
+        cassandraLockManager.createLocalLock( sharedLock );
+    }
+
+
+    @Test
+    public void testLockUnlock() {
+
+        final LockId sharedLock = createLockId();
+
+
+        ///
+        final Lock lock1 = cassandraLockManager.createMultiRegionLock( sharedLock );
+
+        //even though it's second, it should lock if we call first
+        final Lock lock2 = cassandraLockManager.createMultiRegionLock( sharedLock );
+
+        //lock #3
+        final Lock lock3 = cassandraLockManager.createMultiRegionLock( sharedLock );
+
+
+        assertTrue( lock2.tryLock( ONE_HOUR_TTL, HOURS ) );
+
+        assertFalse( lock1.tryLock( ONE_HOUR_TTL, HOURS ) );
+
+        assertFalse(lock3.tryLock( ONE_HOUR_TTL, HOURS ));
+
+        //now unlock lock 2 and try again
+
+        lock2.unlock();
+
+        assertTrue( lock1.tryLock( ONE_HOUR_TTL, HOURS ) );
+
+        assertFalse( lock3.tryLock( ONE_HOUR_TTL, HOURS ) );
+
+        //now unlock 1 and lock 3
+
+        lock1.unlock();
+
+        assertTrue(lock3.tryLock(ONE_HOUR_TTL, HOURS));
+    }
+
+
+    private LockId createLockId() {
+
+        LockId lockId = mock( LockId.class );
+        //mock up scope
+        when( lockId.getApplicationScope() ).thenReturn( scope );
+
+        when( lockId.generateKey() ).thenReturn( UUIDGenerator.newTimeUUID() + "" );
+
+        return lockId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerializationTest.java b/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerializationTest.java
new file mode 100644
index 0000000..27147cf
--- /dev/null
+++ b/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/LockProposalSerializationTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.locks.impl;
+
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.locks.LockId;
+import org.apache.usergrid.persistence.locks.guice.TestLockModule;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@RunWith( ITRunner.class )
+@UseModules( { TestLockModule.class } )
+public class LockProposalSerializationTest {
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected LockProposalSerialization serialization;
+
+
+    protected ApplicationScope scope;
+
+    protected final AtomicLong atomicLong = new AtomicLong();
+
+
+    private static final int ONE_HOUR_TTL = 360;
+
+
+    @Before
+    public void setup() {
+        scope = mock( ApplicationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getApplication() ).thenReturn( orgId );
+    }
+
+
+    @Test
+    public void testOnlyLock() {
+
+        final LockId testId = createLockId();
+        final UUID proposed = UUIDGenerator.newTimeUUID();
+
+
+        final LockCandidate candidate = serialization.writeNewValue( testId, proposed, ONE_HOUR_TTL );
+
+
+        assertNotNull( candidate );
+
+        assertTrue( "First written value is the only proposal", candidate.isFirst( proposed ) );
+
+        assertTrue( "No second value present", candidate.isLocked( proposed ) );
+        assertFalse( "Should not ack", candidate.getValueToAck( proposed ).isPresent() );
+    }
+
+
+    @Test
+    public void testTwoLocks() {
+
+        final LockId testId = createLockId();
+        final UUID proposedFirst = UUIDGenerator.newTimeUUID();
+
+        final UUID proposedSecond = UUIDGenerator.newTimeUUID();
+
+
+        final LockCandidate candidateFirst = serialization.writeNewValue( testId, proposedFirst, ONE_HOUR_TTL );
+
+        final LockCandidate candidateSecond = serialization.writeNewValue( testId, proposedSecond, ONE_HOUR_TTL );
+
+        assertNotNull( candidateFirst );
+
+        assertTrue( "First written value is the only proposal", candidateFirst.isFirst( proposedFirst ) );
+
+        assertTrue( "No second value present", candidateFirst.isLocked( proposedFirst ) );
+
+        assertFalse( "Should not ack", candidateFirst.getValueToAck( proposedFirst ).isPresent() );
+
+        assertTrue( "Lock is present", candidateFirst.isLocked( proposedFirst ) );
+
+
+        assertNotNull( candidateSecond );
+
+        assertFalse( "Second candidate not present", candidateSecond.isFirst( proposedSecond ) );
+
+        assertFalse( "Second does not have lock", candidateSecond.isLocked( proposedSecond ) );
+
+        assertEquals( "Second should ack", proposedFirst, candidateSecond.getValueToAck( proposedSecond ).get() );
+
+        assertFalse( "Lock is not present", candidateSecond.isLocked( proposedSecond ) );
+
+
+        //now remove first, second should get lock
+
+        final LockCandidate ackedResponse =
+            serialization.ackProposed( testId, proposedSecond, proposedFirst, ONE_HOUR_TTL );
+
+
+        //now check if we can lock, still, we should be able to
+
+        assertTrue( "First written value is the only proposal", ackedResponse.isFirst( proposedFirst ) );
+
+        assertTrue( "No second value present", ackedResponse.isLocked( proposedFirst ) );
+
+        assertFalse( "Should not ack", ackedResponse.getValueToAck( proposedFirst ).isPresent() );
+
+        assertTrue( "Lock is present", ackedResponse.isLocked( proposedFirst ) );
+
+
+        //
+        assertFalse( "Second candidate is present", ackedResponse.isFirst( proposedSecond ) );
+
+        assertFalse( "Second does not have lock", ackedResponse.isLocked( proposedSecond ) );
+
+        //now we don't need to ack
+        assertFalse( "Second should ack", ackedResponse.getValueToAck( proposedSecond ).isPresent() );
+
+        assertFalse( "Lock is not present", ackedResponse.isLocked( proposedSecond ) );
+    }
+
+
+
+    @Test
+    public void testTwoLocksFirstDeleted() {
+
+        final LockId testId = createLockId();
+        final UUID proposedFirst = UUIDGenerator.newTimeUUID();
+
+        final UUID proposedSecond = UUIDGenerator.newTimeUUID();
+
+
+        final LockCandidate candidateFirst = serialization.writeNewValue( testId, proposedFirst, ONE_HOUR_TTL );
+
+        final LockCandidate candidateSecond = serialization.writeNewValue( testId, proposedSecond, ONE_HOUR_TTL );
+
+        assertNotNull( candidateFirst );
+
+        assertTrue( "First written value is the only proposal", candidateFirst.isFirst( proposedFirst ) );
+
+        assertTrue( "No second value present", candidateFirst.isLocked( proposedFirst ) );
+
+        assertFalse( "Should not ack", candidateFirst.getValueToAck( proposedFirst ).isPresent() );
+
+
+        assertNotNull( candidateSecond );
+
+        assertFalse( "Second candidate not present", candidateSecond.isFirst( proposedSecond ) );
+
+        assertFalse( "Second does not have lock", candidateSecond.isLocked( proposedSecond ) );
+
+        assertEquals( "Second should ack", proposedFirst,  candidateSecond.getValueToAck( proposedSecond ).get() );
+
+
+        //now remove first, second should get lock
+
+        final LockCandidate ackedResponse =
+            serialization.ackProposed( testId, proposedSecond, proposedFirst, ONE_HOUR_TTL );
+
+
+        //now check if we can lock, still, we should be able to
+
+        assertTrue( "First written value is the only proposal", ackedResponse.isFirst( proposedFirst ) );
+
+        assertTrue( "No second value present", ackedResponse.isLocked( proposedFirst ) );
+
+        assertFalse( "Should not ack", ackedResponse.getValueToAck( proposedFirst ).isPresent() );
+
+        //
+        assertFalse( "Second candidate is present", ackedResponse.isFirst( proposedSecond ) );
+
+        assertFalse( "Second does not have lock", ackedResponse.isLocked( proposedSecond ) );
+
+        //now we don't need to ack
+        assertFalse( "Second should ack", ackedResponse.getValueToAck( proposedSecond ).isPresent() );
+    }
+
+
+    private LockId createLockId() {
+
+        LockId lockId = mock( LockId.class );
+        //mock up scope
+        when( lockId.getApplicationScope() ).thenReturn( scope );
+
+        when( lockId.generateKey() ).thenReturn( atomicLong.incrementAndGet() + "" );
+
+        return lockId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dc3e9d19/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerializationTest.java b/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerializationTest.java
deleted file mode 100644
index b33aaac..0000000
--- a/stack/corepersistence/locks/src/test/java/org/apache/usergrid/persistence/locks/impl/NodeShardProposalSerializationTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.persistence.locks.impl;
-
-
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.test.ITRunner;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.locks.LockId;
-import org.apache.usergrid.persistence.locks.guice.TestLockModule;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Inject;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-@RunWith( ITRunner.class )
-@UseModules( { TestLockModule.class } )
-public class NodeShardProposalSerializationTest {
-
-    private static final Logger log = LoggerFactory.getLogger( NodeShardProposalSerializationTest.class );
-
-    @Inject
-    @Rule
-    public MigrationManagerRule migrationManagerRule;
-
-
-    @Inject
-    protected NodeShardProposalSerialization serialization;
-
-
-    protected ApplicationScope scope;
-
-    protected final AtomicLong atomicLong = new AtomicLong();
-
-
-    private static final int ONE_HOUR_TTL = 360;
-
-
-    @Before
-    public void setup() {
-        scope = mock( ApplicationScope.class );
-
-        Id orgId = mock( Id.class );
-
-        when( orgId.getType() ).thenReturn( "organization" );
-        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
-        when( scope.getApplication() ).thenReturn( orgId );
-    }
-
-
-    @Test
-    public void testOnlyLock() {
-
-        final LockId testId = createLockId();
-        final UUID proposed = UUIDGenerator.newTimeUUID();
-
-
-        final LockCandidate candidate = serialization.writeNewValue( testId, proposed, ONE_HOUR_TTL );
-
-
-        assertNotNull( candidate );
-
-        assertTrue( "First written value is the only proposal", candidate.isFirst( proposed ) );
-
-        assertTrue( "No second value present", candidate.isLocked( proposed ) );
-        assertFalse( "Should not ack", candidate.getValueToAck( proposed ).isPresent() );
-    }
-
-
-    @Test
-    public void testTwoLocks() {
-
-        final LockId testId = createLockId();
-        final UUID proposedFirst = UUIDGenerator.newTimeUUID();
-
-        final UUID proposedSecond = UUIDGenerator.newTimeUUID();
-
-
-        final LockCandidate candidateFirst = serialization.writeNewValue( testId, proposedFirst, ONE_HOUR_TTL );
-
-        final LockCandidate candidateSecond = serialization.writeNewValue( testId, proposedSecond, ONE_HOUR_TTL );
-
-        assertNotNull( candidateFirst );
-
-        assertTrue( "First written value is the only proposal", candidateFirst.isFirst( proposedFirst ) );
-
-        assertTrue( "No second value present", candidateFirst.isLocked( proposedFirst ) );
-
-        assertFalse( "Should not ack", candidateFirst.getValueToAck( proposedFirst ).isPresent() );
-
-
-        assertNotNull( candidateSecond );
-
-        assertFalse( "Second candidate not present", candidateSecond.isFirst( proposedSecond ) );
-
-        assertFalse( "Second does not have lock", candidateSecond.isLocked( proposedSecond ) );
-
-        assertEquals( "Second should ack", proposedFirst,  candidateSecond.getValueToAck( proposedSecond ).get() );
-
-
-        //now remove first, second should get lock
-
-        final LockCandidate ackedResponse =
-            serialization.ackProposed( testId, proposedSecond, proposedFirst, ONE_HOUR_TTL );
-
-
-        //now check if we can lock, still, we should be able to
-
-        assertTrue( "First written value is the only proposal", ackedResponse.isFirst( proposedFirst ) );
-
-        assertTrue( "No second value present", ackedResponse.isLocked( proposedFirst ) );
-
-        assertFalse( "Should not ack", ackedResponse.getValueToAck( proposedFirst ).isPresent() );
-
-        //
-        assertFalse( "Second candidate is present", ackedResponse.isFirst( proposedSecond ) );
-
-        assertFalse( "Second does not have lock", ackedResponse.isLocked( proposedSecond ) );
-
-        //now we don't need to ack
-        assertFalse( "Second should ack", ackedResponse.getValueToAck( proposedSecond ).isPresent() );
-    }
-
-
-    private LockId createLockId() {
-
-        LockId lockId = mock( LockId.class );
-        //mock up scope
-        when( lockId.getApplicationScope() ).thenReturn( scope );
-
-        when( lockId.generateKey() ).thenReturn( atomicLong.incrementAndGet() + "" );
-
-        return lockId;
-    }
-}


Mime
View raw message