usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [11/50] incubator-usergrid git commit: Refactored to write to cassandra for data, and to SQS for the id of the data.
Date Mon, 16 Mar 2015 23:37:39 GMT
Refactored to write to cassandra for data, and to SQS for the id of the data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8d8eb060
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8d8eb060
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8d8eb060

Branch: refs/heads/two-dot-o
Commit: 8d8eb060e20bd9942d591227ae2cdd5337214b81
Parents: 9111d94
Author: Todd Nine <tnine@apigee.com>
Authored: Tue Mar 10 20:38:52 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Tue Mar 10 20:38:52 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/map/MapManager.java    |  35 ++--
 .../persistence/map/impl/MapManagerImpl.java    |   8 +
 .../persistence/map/impl/MapSerialization.java  |   9 +
 .../map/impl/MapSerializationImpl.java          |  93 +++++++++
 .../persistence/map/MapManagerTest.java         |  49 ++++-
 stack/corepersistence/queryindex/pom.xml        |   1 -
 .../index/IndexOperationMessage.java            |  36 +++-
 .../index/impl/BufferQueueSQSImpl.java          | 167 ++++++++++++++--
 .../persistence/index/impl/DeIndexRequest.java  |  10 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 196 +++++++++++--------
 .../index/guice/TestIndexModule.java            |   4 +-
 .../index/impl/BufferQueueSQSImplTest.java      | 145 ++++++++++++++
 .../impl/EntityConnectionIndexImplTest.java     |   4 +-
 .../persistence/index/impl/EntityIndexTest.java |  21 +-
 .../queue/impl/SQSQueueManagerImpl.java         |  32 ++-
 15 files changed, 656 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index 62fe57d..69e0874 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -1,23 +1,26 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.usergrid.persistence.map;
 
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 
 
@@ -33,6 +36,14 @@ public interface MapManager {
      */
     public String getString( final String key );
 
+
+    /**
+     * Get the values for all the keys.  If a value does not exist, it won't be present in the map
+     * @param keys
+     * @return
+     */
+    public Map<String, String> getStrings(final Collection<String> keys);
+
     /**
      * Return the string, null if not found
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index c077c7d..fb2e7ff 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.persistence.map.impl;
 
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.map.MapManager;
@@ -51,6 +53,12 @@ public class MapManagerImpl implements MapManager {
 
 
     @Override
+    public Map<String, String> getStrings( final Collection<String> keys ) {
+        return mapSerialization.getStrings( scope, keys );
+    }
+
+
+    @Override
     public void putString( final String key, final String value ) {
           mapSerialization.putString( scope, key, value );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index 6e7e328..2e958c2 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.map.impl;
 
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
@@ -33,6 +35,13 @@ public interface MapSerialization extends Migration {
     public String getString( final MapScope scope, final String key );
 
     /**
+     * Get strings from the map
+     * @param keys
+     * @return
+     */
+    public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
+
+    /**
      * Return the string, null if not found
      */
     public void putString( final MapScope scope, final String key, final String value );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 715c202..825d636 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -18,9 +18,12 @@
  */
 
 package org.apache.usergrid.persistence.map.impl;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import com.google.common.base.Preconditions;
@@ -50,6 +53,9 @@ import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
 import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.model.Rows;
+import com.netflix.astyanax.query.ColumnFamilyQuery;
 import com.netflix.astyanax.serializers.BooleanSerializer;
 import com.netflix.astyanax.serializers.StringSerializer;
 
@@ -73,6 +79,9 @@ public class MapSerializationImpl implements MapSerialization {
         private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
 
 
+    private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
+
+
         /**
          * CFs where the row key contains the source node id
          */
@@ -126,6 +135,12 @@ public class MapSerializationImpl implements MapSerialization {
 
 
     @Override
+    public Map<String, String> getStrings(final MapScope scope,  final Collection<String> keys ) {
+        return getValues( scope, keys, STRING_RESULTS_BUILDER );
+    }
+
+
+    @Override
     public void putString( final MapScope scope, final String key, final String value ) {
         final RowOp op = new RowOp() {
             @Override
@@ -371,6 +386,49 @@ public class MapSerializationImpl implements MapSerialization {
     }
 
 
+    /**
+     * Get multiple values, using the string builder
+     * @param scope
+     * @param keys
+     * @param builder
+     * @param <T>
+     * @return
+     */
+    private <T> T getValues(final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder) {
+
+
+        final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
+
+        for(final String key: keys){
+             //add it to the entry
+            final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+
+            rowKeys.add( entryRowKey );
+
+        }
+
+
+
+          //now get all columns, including the "old row key value"
+          try {
+              final Rows<ScopedRowKey<MapEntryKey>, Boolean>
+                  rows = keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true )
+                                                     .execute().getResult();
+
+
+             return builder.buildResults( rows );
+          }
+          catch ( NotFoundException nfe ) {
+              //nothing to return
+              return null;
+          }
+          catch ( ConnectionException e ) {
+              throw new RuntimeException( "Unable to connect to cassandra", e );
+          }
+      }
+
+
+
     private void executeBatch(MutationBatch batch) {
         try {
             batch.execute();
@@ -449,4 +507,39 @@ public class MapSerializationImpl implements MapSerialization {
             return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
         }
     }
+
+
+    /**
+     * Build the results from the row keys
+     * @param <T>
+     */
+    private static interface ResultsBuilder<T> {
+
+        public T buildResults(final  Rows<ScopedRowKey<MapEntryKey>, Boolean> rows);
+    }
+
+    public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>>{
+
+        @Override
+        public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
+            final int size = rows.size();
+
+            final Map<String, String> results = new HashMap<>(size);
+
+            for(int i = 0; i < size; i ++){
+
+                final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
+
+                final String value = row.getColumns().getStringValue( true, null );
+
+                if(value == null){
+                    continue;
+                }
+
+               results.put( row.getKey().getKey().key,  value );
+            }
+
+            return results;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
index df4394e..41286ab 100644
--- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
+++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
@@ -20,6 +20,9 @@
 package org.apache.usergrid.persistence.map;
 
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -34,9 +37,11 @@ import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.map.guice.TestMapModule;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
 
+import static junit.framework.TestCase.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -79,6 +84,47 @@ public class MapManagerTest {
 
 
     @Test
+    public void multiReadNoKey() {
+        MapManager mm = mmf.createMapManager( this.scope );
+
+        final String key = UUIDGenerator.newTimeUUID().toString();
+
+        final Map<String, String> results = mm.getStrings( Collections.singleton( key ) );
+
+        assertNotNull( results );
+
+        final String shouldBeMissing = results.get( key );
+
+        assertNull( shouldBeMissing );
+    }
+
+
+    @Test
+    public void writeReadStringBatch() {
+        MapManager mm = mmf.createMapManager( this.scope );
+
+        final String key1 = "key1";
+        final String value1 = "value1";
+
+        mm.putString( key1, value1 );
+
+
+        final String key2 = "key2";
+        final String value2 = "value2";
+
+        mm.putString( key2, value2 );
+
+
+        final Map<String, String> returned = mm.getStrings( Arrays.asList( key1, key2 ) );
+
+        assertNotNull( returned );
+
+        assertEquals( value1, returned.get( key1 ) );
+        assertEquals( value2, returned.get( key2 ) );
+    }
+
+
+    @Test
     public void writeReadStringTTL() throws InterruptedException {
 
         MapManager mm = mmf.createMapManager( this.scope );
@@ -106,8 +152,7 @@ public class MapManagerTest {
         //now read it should be gone
         final String timedOut = mm.getString( key );
 
-        assertNull("Value was not returned", timedOut);
-
+        assertNull( "Value was not returned", timedOut );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index a5fbf6a..af843ad 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -145,7 +145,6 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
             <version>${slf4j.version}</version>
-            <scope>test</scope>
         </dependency>
 
         <!-- common stuff, logging, etc.-->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 7d8a859..a7388d6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -60,17 +60,17 @@ public class IndexOperationMessage implements Serializable {
 
 
     public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
-        indexRequests.addAll( indexRequests );
+        this.indexRequests.addAll( indexRequests );
     }
 
 
     public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
-        deIndexRequests.add( deIndexRequest );
+        this.deIndexRequests.add( deIndexRequest );
     }
 
 
     public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
-        deIndexRequests.addAll( deIndexRequests );
+        this.deIndexRequests.addAll( deIndexRequests );
     }
 
 
@@ -96,4 +96,34 @@ public class IndexOperationMessage implements Serializable {
     public BetterFuture<IndexOperationMessage> getFuture() {
         return containerFuture;
     }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final IndexOperationMessage that = ( IndexOperationMessage ) o;
+
+        if ( !deIndexRequests.equals( that.deIndexRequests ) ) {
+            return false;
+        }
+        if ( !indexRequests.equals( that.indexRequests ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = indexRequests.hashCode();
+        result = 31 * result + deIndexRequests.hashCode();
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index 833e045..c6acb36 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -23,56 +23,132 @@ package org.apache.usergrid.persistence.index.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.persistence.queue.QueueScope;
 import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 
+/**
+ * This is experimental at best.  Our SQS size limit is a problem.  We shouldn't use this for index operation. Only for
+ * performing
+ */
 @Singleton
 public class BufferQueueSQSImpl implements BufferQueue {
 
+    private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class );
+
     /** Hacky, copied from CPEntityManager b/c we can't access it here */
     public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
 
 
+    /**
+     * Set our TTL to 1 month.  This is high, but in the event of a bug, we want these entries to get removed
+     */
+    public static final int TTL = 60 * 60 * 24 * 30;
+
+    /**
+     * The name to put in the map
+     */
+    public static final String MAP_NAME = "esqueuedata";
+
+
     private static final String QUEUE_NAME = "es_queue";
 
+    private static SmileFactory SMILE_FACTORY = new SmileFactory();
+
+
+    static {
+        SMILE_FACTORY.delegateToTextual( true );
+    }
+
+
     private final QueueManager queue;
+    private final MapManager mapManager;
     private final IndexFig indexFig;
+    private final ObjectMapper mapper;
+    private final Meter readMeter;
+    private final Timer readTimer;
+    private final Meter writeMeter;
+    private final Timer writeTimer;
 
 
     @Inject
-    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig ) {
-        final QueueScope scope =
+    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig,
+                               final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
+        final QueueScope queueScope =
             new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
 
-        this.queue = queueManagerFactory.getQueueManager( scope );
+        this.queue = queueManagerFactory.getQueueManager( queueScope );
         this.indexFig = indexFig;
+
+        final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME );
+
+        this.mapManager = mapManagerFactory.createMapManager( scope );
+
+
+        this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" );
+        this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" );
+
+        this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" );
+        this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" );
+
+        this.mapper = new ObjectMapper( SMILE_FACTORY );
+        //pretty print, disabling for speed
+        //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+
     }
 
 
     @Override
     public void offer( final IndexOperationMessage operation ) {
 
+        final Timer.Context timer = this.writeTimer.time();
+        this.writeMeter.mark();
+
+        final UUID identifier = UUIDGenerator.newTimeUUID();
 
         try {
-            this.queue.sendMessage( operation );
+
+            final String payLoad = toString( operation );
+
+            //write to cassandra
+            this.mapManager.putString( identifier.toString(), payLoad, TTL );
+
+            //signal to SQS
+            this.queue.sendMessage( identifier );
             operation.getFuture().run();
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
         }
+        finally {
+            timer.stop();
+        }
     }
 
 
@@ -83,23 +159,71 @@ public class BufferQueueSQSImpl implements BufferQueue {
 
         final int actualTake = Math.min( 10, takeSize );
 
-        List<QueueMessage> messages = queue
-            .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
-                IndexOperationMessage.class );
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
 
+            List<QueueMessage> messages = queue
+                .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+                    String.class );
 
-        final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
 
-        for ( final QueueMessage message : messages ) {
 
-            final IndexOperationMessage messageBody = ( IndexOperationMessage ) message.getBody();
+            final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
 
-            SqsIndexOperationMessage operation = new SqsIndexOperationMessage(message,  messageBody );
+            final List<String> mapEntries = new ArrayList<>( messages.size() );
 
-            response.add( operation );
-        }
 
-        return response;
+            if(messages.size() == 0){
+                return response;
+            }
+
+            //add all our keys  for a single round trip
+            for ( final QueueMessage message : messages ) {
+                mapEntries.add( message.getBody().toString() );
+            }
+
+            //look up the values
+            final Map<String, String> values = mapManager.getStrings( mapEntries );
+
+
+            //load them into our response
+            for ( final QueueMessage message : messages ) {
+
+                final String key = message.getBody().toString();
+
+                //now see if the key was there
+                final String payload = values.get( key );
+
+                //the entry was not present in cassandra, ignore this message.  Failure should eventually kick it to
+                // a DLQ
+
+                if ( payload == null ) {
+                    continue;
+                }
+
+                final IndexOperationMessage messageBody;
+
+                try {
+                    messageBody = fromString( payload );
+                }
+                catch ( IOException e ) {
+                    logger.error( "Unable to deserialize message from string.  This is a bug", e );
+                    throw new RuntimeException( "Unable to deserialize message from string.  This is a bug", e );
+                }
+
+                SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody );
+
+                response.add( operation );
+            }
+
+            readMeter.mark( response.size() );
+            return response;
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
     }
 
 
@@ -107,7 +231,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
     public void ack( final List<IndexOperationMessage> messages ) {
 
         //nothing to do
-        if(messages.size() == 0){
+        if ( messages.size() == 0 ) {
             return;
         }
 
@@ -121,6 +245,19 @@ public class BufferQueueSQSImpl implements BufferQueue {
     }
 
 
+    /** Read the object from Base64 string. */
+    private IndexOperationMessage fromString( String s ) throws IOException {
+        IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class );
+        return o;
+    }
+
+
+    /** Write the object to a Base64 string. */
+    private String toString( IndexOperationMessage o ) throws IOException {
+        return mapper.writeValueAsString( o );
+    }
+
+
     /**
      * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
index c63c4df..9f3ce66 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -37,9 +37,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 public class DeIndexRequest implements BatchRequest {
 
-    public final String[] indexes;
-    public final String entityType;
-    public final String documentId;
+    public String[] indexes;
+    public String entityType;
+    public String documentId;
 
 
     public DeIndexRequest( final String[] indexes, final String entityType, final String documentId) {
@@ -49,6 +49,10 @@ public class DeIndexRequest implements BatchRequest {
     }
 
 
+    public DeIndexRequest() {
+    }
+
+
     @Override
     public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 8547889..3fc3e77 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -42,7 +42,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
+import rx.Subscription;
 import rx.functions.Action1;
+import rx.functions.Action2;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -60,17 +62,23 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final IndexFig config;
     private final FailureMonitorImpl failureMonitor;
     private final Client client;
-    private final Observable<List<IndexOperationMessage>> consumer;
+
     private final Timer flushTimer;
     private final Counter indexSizeCounter;
     private final Meter flushMeter;
     private final Timer produceTimer;
     private final BufferQueue bufferQueue;
 
+    //the actively running subscription
+    private Subscription subscription;
+
+    private  Observable<List<IndexOperationMessage>> consumer;
+
+    private Object mutex = new Object();
+
     @Inject
     public EsIndexBufferConsumerImpl( final IndexFig config,  final EsProvider
         provider, final MetricsFactory metricsFactory,   final BufferQueue bufferQueue ){
-        this.bufferQueue = bufferQueue;
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
         this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
         this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -78,81 +86,101 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.client = provider.getClient();
         this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
+        this.bufferQueue = bufferQueue;
+
 
 
-        final AtomicInteger countFail = new AtomicInteger();
         //batch up sets of some size and send them in batch
-        this.consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
-            @Override
-            public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
+          start();
+    }
+
+
+    public void start() {
+        synchronized ( mutex) {
 
-                //name our thread so it's easy to see
-                Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+            final AtomicInteger countFail = new AtomicInteger();
 
-                List<IndexOperationMessage> drainList;
-                do {
-                    try {
+            Observable<List<IndexOperationMessage>> consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
+                @Override
+                public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
 
+                    //name our thread so it's easy to see
+                    Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
 
-                        Timer.Context timer = produceTimer.time();
+                    List<IndexOperationMessage> drainList;
+                    do {
+                        try {
 
-                        drainList = bufferQueue
-                            .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
 
+                            Timer.Context timer = produceTimer.time();
 
-                        subscriber.onNext( drainList );
+                            drainList = bufferQueue.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
+                                TimeUnit.MILLISECONDS );
 
 
-                        timer.stop();
+                            subscriber.onNext( drainList );
 
-                        countFail.set( 0 );
-                    }
-                    catch ( EsRejectedExecutionException err ) {
-                        countFail.incrementAndGet();
-                        log.error(
-                            "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  "
-                                + "Failed {} consecutive times",
-                            config.getFailRefreshCount(), countFail.get() );
-
-                        //es  rejected the exception, sleep and retry in the queue
-                        try {
-                            Thread.sleep( config.getFailureRetryTime() );
+
+                            timer.stop();
+
+                            countFail.set( 0 );
                         }
-                        catch ( InterruptedException e ) {
-                            //swallow
+                        catch ( EsRejectedExecutionException err ) {
+                            countFail.incrementAndGet();
+                            log.error(
+                                "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  " + "Failed {} consecutive times", config.getFailRefreshCount(),
+                                countFail.get() );
+
+                            //es  rejected the exception, sleep and retry in the queue
+                            try {
+                                Thread.sleep( config.getFailureRetryTime() );
+                            }
+                            catch ( InterruptedException e ) {
+                                //swallow
+                            }
                         }
-                    }
-                    catch ( Exception e ) {
-                        int count = countFail.incrementAndGet();
-                        log.error( "failed to dequeue", e );
-                        if ( count > 200 ) {
-                            log.error( "Shutting down index drain due to repetitive failures" );
+                        catch ( Exception e ) {
+                            int count = countFail.incrementAndGet();
+                            log.error( "failed to dequeue", e );
+                            if ( count > 200 ) {
+                                log.error( "Shutting down index drain due to repetitive failures" );
+                            }
                         }
                     }
+                    while ( true );
                 }
-                while ( true );
-            }
-        } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
-            @Override
-            public void call( List<IndexOperationMessage> containerList ) {
-                if ( containerList.size() > 0 ) {
-                    flushMeter.mark( containerList.size() );
-                    Timer.Context time = flushTimer.time();
-                    execute( containerList );
-                    time.stop();
+            } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+                @Override
+                public void call( List<IndexOperationMessage> containerList ) {
+                    if ( containerList.size() > 0 ) {
+                        flushMeter.mark( containerList.size() );
+                        Timer.Context time = flushTimer.time();
+                        execute( containerList );
+                        time.stop();
+                    }
                 }
+            } )
+                //ack after we process
+                .doOnNext( new Action1<List<IndexOperationMessage>>() {
+                    @Override
+                    public void call( final List<IndexOperationMessage> indexOperationMessages ) {
+                        bufferQueue.ack( indexOperationMessages );
+                    }
+                } );
+
+            //start in the background
+            subscription = consumer.subscribe();
+        }
+    }
+
+
+    public void stop() {
+        synchronized ( mutex ) {
+            //stop consuming
+            if(subscription != null) {
+                subscription.unsubscribe();
             }
-} )
-            //ack after we process
-          .doOnNext( new Action1<List<IndexOperationMessage>>() {
-              @Override
-              public void call( final List<IndexOperationMessage> indexOperationMessages ) {
-                  bufferQueue.ack( indexOperationMessages );
-              }
-          } );
-
-        //start in the background
-        consumer.subscribe();
+        }
     }
 
     /**
@@ -165,43 +193,36 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         }
 
         //process and flatten all the messages to builder requests
-        Observable<BatchRequest> flattenMessages = Observable.from(operationMessages)
-            .subscribeOn(Schedulers.io())
-            .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
-                @Override
-                public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
-                    final Observable<DeIndexRequest> deIndex = Observable.from( operationMessage.getDeIndexRequests () );
-                    final Observable<IndexRequest> index = Observable.from( operationMessage.getIndexRequests() );
-
-                    return Observable.merge( deIndex, index );
-                }
-            } );
-
+        Observable<IndexOperationMessage> flattenMessages = Observable.from( operationMessages );
 
 
         //batch shard operations into a bulk request
-        flattenMessages.toList()
-            .doOnNext(new Action1<List<BatchRequest>>() {
-                @Override
-                public void call(List<BatchRequest> builders) {
-                    try {
-                        final BulkRequestBuilder bulkRequest = initRequest();
-                        for (BatchRequest builder : builders) {
-                            indexSizeCounter.dec();
+        flattenMessages.flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
+            @Override
+            public Observable<BatchRequest> call( final IndexOperationMessage indexOperationMessage ) {
+                final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests() );
+                final Observable<DeIndexRequest> deIndex = Observable.from( indexOperationMessage.getDeIndexRequests() );
 
-                            builder.doOperation( client, bulkRequest );
-                        }
-                        sendRequest(bulkRequest);
-                    }catch (Exception e){
-                        log.error("Failed while sending bulk",e);
-                    }
-                }
-            })
-            .toBlocking().lastOrDefault(null);
+                return Observable.merge( index, deIndex );
+            }
+        } )
+      //collection all the operations into a single stream
+       .collect( initRequest(), new Action2<BulkRequestBuilder, BatchRequest>() {
+           @Override
+           public void call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
+               batchRequest.doOperation( client, bulkRequestBuilder );
+           }
+       } )
+        //send the request off to ES
+        .doOnNext( new Action1<BulkRequestBuilder>() {
+            @Override
+            public void call( final BulkRequestBuilder bulkRequestBuilder ) {
+                sendRequest( bulkRequestBuilder );
+            }
+        } ).toBlocking().last();
 
         //call back all futures
         Observable.from(operationMessages)
-            .subscribeOn(Schedulers.io())
             .doOnNext(new Action1<IndexOperationMessage>() {
                 @Override
                 public void call(IndexOperationMessage operationMessage) {
@@ -211,6 +232,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             .toBlocking().lastOrDefault(null);
     }
 
+
     /**
      * initialize request
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 57c2fab..7d7a18d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -38,8 +38,8 @@ public class TestIndexModule extends TestModule {
         install( new IndexModule() {
             @Override
             public void wireBufferQueue() {
-                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
-//                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
+//                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
new file mode 100644
index 0000000..4a4672e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import static org.junit.Assert.*;
+
+
+
+
+@RunWith(EsRunner.class)
+@UseModules({ TestIndexModule.class })
+@NotThreadSafe
+public class BufferQueueSQSImplTest {
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+    @Inject
+    private BufferQueueSQSImpl bufferQueueSQS;
+
+    @Inject
+    private EsIndexBufferConsumerImpl esIndexBufferConsumer;
+
+
+    @Before
+    public void stop() {
+        esIndexBufferConsumer.stop();
+    }
+
+
+    @After
+    public void after() {
+        esIndexBufferConsumer.start();
+    }
+
+
+
+
+    @Test
+    public void testMessageIndexing(){
+
+        final Map<String, Object> request1Data  = new HashMap<String, Object>() {{put("test", "testval1");}};
+        final IndexRequest indexRequest1 =  new IndexRequest( "testAlias1", "testType1", "testDoc1",request1Data );
+
+
+        final Map<String, Object> request2Data  = new HashMap<String, Object>() {{put("test", "testval2");}};
+        final IndexRequest indexRequest2 =  new IndexRequest( "testAlias2", "testType2", "testDoc2",request2Data );
+
+
+        //de-index request
+        final DeIndexRequest deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, "testType3", "testId3" );
+
+        final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, "testType4", "testId4" );
+
+
+
+
+        IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
+        indexOperationMessage.addIndexRequest( indexRequest1);
+        indexOperationMessage.addIndexRequest( indexRequest2);
+
+        indexOperationMessage.addDeIndexRequest( deIndexRequest1 );
+        indexOperationMessage.addDeIndexRequest( deIndexRequest2 );
+
+        bufferQueueSQS.offer( indexOperationMessage );
+
+        //wait for it to send to SQS
+        indexOperationMessage.getFuture().get();
+
+        //now get it back
+
+        final List<IndexOperationMessage> ops = bufferQueueSQS.take( 10,  20, TimeUnit.SECONDS );
+
+        assertTrue(ops.size() > 1);
+
+        final IndexOperationMessage returnedOperation = ops.get( 0 );
+
+         //get the operations out
+
+        final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests();
+
+        assertTrue(indexRequestSet.contains(indexRequest1));
+        assertTrue(indexRequestSet.contains(indexRequest2));
+
+
+        final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests();
+
+        assertTrue( deIndexRequests.contains( deIndexRequest1 ) );
+        assertTrue( deIndexRequests.contains( deIndexRequest2 ) );
+
+
+
+        //now ack the message
+
+        bufferQueueSQS.ack( ops );
+
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 215ff57..c5f3488 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -141,7 +141,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
 
 
         EsTestUtils.waitForTasks(personLikesIndex);
-        Thread.sleep( 30000 );
+        Thread.sleep( 2000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex
@@ -271,7 +271,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         personLikesIndex.refresh();
 
         EsTestUtils.waitForTasks( personLikesIndex );
-        Thread.sleep( 30000 );
+        Thread.sleep( 2000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex.search( searchScope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index a15053c..a2135a3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -24,12 +24,14 @@ import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.model.field.ArrayField;
 import org.apache.usergrid.persistence.model.field.EntityObjectField;
 import org.apache.usergrid.persistence.model.field.UUIDField;
 import org.apache.usergrid.persistence.model.field.value.EntityObject;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -69,8 +71,14 @@ public class EntityIndexTest extends BaseIT {
     @Inject
     public EntityIndexFactory eif;
 
+    //TODO T.N. Remove this when we move the cursor mapping back to core
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
     @Test
-    public void testIndex() throws IOException {
+    public void testIndex() throws IOException, InterruptedException {
         Id appId = new SimpleId( "application" );
 
         ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
@@ -86,7 +94,9 @@ public class EntityIndexTest extends BaseIT {
 
         entityIndex.refresh();
 
-        testQueries( indexScope, searchTypes,  entityIndex );
+        Thread.sleep(100000000);
+
+        testQueries( indexScope, searchTypes, entityIndex );
     }
 
     @Test
@@ -660,15 +670,12 @@ public class EntityIndexTest extends BaseIT {
 
         for ( int i = 0; i < size; i++ ) {
             final String middleName = "middleName" + UUIDUtils.newTimeUUID();
-            Map<String, Object> properties = new LinkedHashMap<String, Object>();
-            properties.put( "username", "edanuff" );
-            properties.put( "email", "ed@anuff.com" );
-            properties.put( "middlename", middleName );
 
             Map entityMap = new HashMap() {{
                 put( "username", "edanuff" );
                 put( "email", "ed@anuff.com" );
                 put( "middlename", middleName );
+                put( "created", System.nanoTime() );
             }};
 
             final Id userId = new SimpleId( "user" );
@@ -700,7 +707,7 @@ public class EntityIndexTest extends BaseIT {
 
         for ( int i = 0; i < expectedPages; i++ ) {
             //**
-            final Query query = Query.fromQL( "select *" );
+            final Query query = Query.fromQL( "select * order by created" );
             query.setLimit( limit );
 
             if ( cursor != null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 10aa621..e2c5c1e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -48,20 +48,19 @@ import java.util.concurrent.TimeUnit;
 public class SQSQueueManagerImpl implements QueueManager {
     private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
 
-    private  AmazonSQSClient sqs;
-    private  QueueScope scope;
-    private  QueueFig fig;
+    private  final AmazonSQSClient sqs;
+    private  final QueueScope scope;
     private  ObjectMapper mapper;
     private static SmileFactory smileFactory = new SmileFactory();
 
-    private static LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
+    private LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
             .maximumSize(1000)
             .build(new CacheLoader<SqsLoader, Queue>() {
                        @Override
                        public Queue load(SqsLoader queueLoader) throws Exception {
                            Queue queue = null;
                            try {
-                               GetQueueUrlResult result = queueLoader.getClient().getQueueUrl(queueLoader.getKey());
+                               GetQueueUrlResult result = sqs.getQueueUrl(queueLoader.getKey());
                                queue = new Queue(result.getQueueUrl());
                            } catch (QueueDoesNotExistException queueDoesNotExistException) {
                                queue = null;
@@ -73,7 +72,7 @@ public class SQSQueueManagerImpl implements QueueManager {
                                String name = queueLoader.getKey();
                                CreateQueueRequest createQueueRequest = new CreateQueueRequest()
                                        .withQueueName(name);
-                               CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
+                               CreateQueueResult result = sqs.createQueue(createQueueRequest);
                                String url = result.getQueueUrl();
                                queue = new Queue(url);
                                LOG.info("Created queue with url {}", url);
@@ -85,7 +84,6 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     @Inject
     public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
-        this.fig = fig;
         this.scope = scope;
         try {
             UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
@@ -99,8 +97,7 @@ public class SQSQueueManagerImpl implements QueueManager {
 //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
         } catch ( Exception e ) {
-            LOG.warn("failed to setup SQS",e);
-//            throw new RuntimeException("Error setting up mapper", e);
+            throw new RuntimeException("Error setting up mapper", e);
         }
     }
 
@@ -127,14 +124,14 @@ public class SQSQueueManagerImpl implements QueueManager {
         }
         waitTime = waitTime/1000;
         String url = getQueue().getUrl();
-        LOG.info("Getting {} messages from {}", limit, url);
+        LOG.debug( "Getting {} messages from {}", limit, url);
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
         receiveMessageRequest.setMaxNumberOfMessages(limit);
         receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
         receiveMessageRequest.setWaitTimeSeconds(waitTime);
         ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
         List<Message> messages = result.getMessages();
-        LOG.info("Received {} messages from {}",messages.size(),url);
+        LOG.debug( "Received {} messages from {}", messages.size(), url);
         List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
         for (Message message : messages) {
             Object body ;
@@ -157,7 +154,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             return;
         }
         String url = getQueue().getUrl();
-        LOG.info("Sending Messages...{} to {}", bodies.size(), url);
+        LOG.debug( "Sending Messages...{} to {}", bodies.size(), url);
 
         SendMessageBatchRequest request = new SendMessageBatchRequest(url);
         List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
@@ -180,7 +177,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             return;
         }
         String url = getQueue().getUrl();
-        LOG.info("Sending Message...{} to {}",body.toString(),url);
+        LOG.debug( "Sending Message...{} to {}", body.toString(), url);
 
         final String stringBody = toString(body);
 
@@ -192,7 +189,7 @@ public class SQSQueueManagerImpl implements QueueManager {
     @Override
     public void commitMessage(QueueMessage queueMessage) {
         String url = getQueue().getUrl();
-        LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
+        LOG.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url);
 
         sqs.deleteMessage(new DeleteMessageRequest()
                 .withQueueUrl(url)
@@ -203,7 +200,7 @@ public class SQSQueueManagerImpl implements QueueManager {
     @Override
     public void commitMessages(List<QueueMessage> queueMessages) {
         String url = getQueue().getUrl();
-        LOG.info("Commit messages {} to queue {}",queueMessages.size(),url);
+        LOG.debug( "Commit messages {} to queue {}", queueMessages.size(), url);
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
         for(QueueMessage message : queueMessages){
             entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
@@ -233,16 +230,11 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     public class SqsLoader {
         private final String key;
-        private final AmazonSQSClient client;
 
         public SqsLoader(String key, AmazonSQSClient client) {
             this.key = key;
-            this.client = client;
         }
 
-        public AmazonSQSClient getClient() {
-            return client;
-        }
 
         public String getKey() {
             return key;


Mime
View raw message