usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [04/10] incubator-usergrid git commit: Finishes changes before tests
Date Wed, 20 May 2015 23:00:34 GMT
Finishes changes before tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/20c9b350
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/20c9b350
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/20c9b350

Branch: refs/heads/two-dot-o-dev
Commit: 20c9b3509cf96a6ecab1a45a2c572fd6a041e00d
Parents: cb179d3
Author: Todd Nine <tnine@apigee.com>
Authored: Thu May 14 17:19:27 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu May 14 17:19:27 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/InMemoryAsyncEventService.java  |   2 -
 .../index/EdgeScopeSerializer.java              |  41 ++++
 .../index/IndexProcessorFig.java                |   6 +-
 .../corepersistence/index/ReIndexService.java   |  75 +++---
 .../index/ReIndexServiceImpl.java               | 226 +++++++++++++++----
 .../pipeline/cursor/CursorSerializerUtil.java   |  54 ++++-
 .../pipeline/cursor/RequestCursor.java          |   9 +-
 .../pipeline/cursor/ResponseCursor.java         |  49 ++--
 .../pipeline/read/AbstractPathFilter.java       |  30 ---
 .../pipeline/read/CursorSeek.java               |  53 +++++
 .../rx/impl/AllEntityIdsObservable.java         |   4 +-
 .../rx/impl/AllEntityIdsObservableImpl.java     |   5 +-
 .../PerformanceEntityRebuildIndexTest.java      |   5 +-
 .../graph/serialization/EdgesObservable.java    |  21 +-
 .../serialization/impl/EdgesObservableImpl.java |   4 +-
 15 files changed, 422 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 96966bf..ddcf826 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -99,8 +99,6 @@ public class InMemoryAsyncEventService implements AsyncEventService {
 
     @Override
     public void index( final EntityIndexOperation entityIndexOperation ) {
-
-
         run(eventBuilder.index( entityIndexOperation ));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
new file mode 100644
index 0000000..2a6a5ac
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+
+
+/**
+ * Serialize our edge scope for cursors
+ */
+public class EdgeScopeSerializer extends AbstractCursorSerializer<EdgeScope> {
+
+
+    public static final EdgeScopeSerializer INSTANCE = new EdgeScopeSerializer();
+
+    @Override
+    protected Class<EdgeScope> getType() {
+        return EdgeScope.class;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index fe9d3fd..8e835e2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -78,9 +78,9 @@ public interface IndexProcessorFig extends GuicyFig {
     String getQueueImplementation();
 
 
-    @Default("30000")
-    @Key("elasticsearch.reindex.sample.interval")
-    long getReIndexSampleInterval();
+    @Default("10000")
+    @Key("elasticsearch.reindex.flush.interval")
+    int getUpdateInterval();
 
 
     @Default("false")

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index b25eca5..f8955dd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -20,22 +20,6 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-import rx.Observer;
-import rx.observables.ConnectableObservable;
-
-
 /**
  * An interface for re-indexing all entities in an application
  */
@@ -46,48 +30,75 @@ public interface ReIndexService {
      * Perform an index rebuild
      *
      * @param indexServiceRequestBuilder The builder to build the request
-     * @return
      */
-    IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder);
+    IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder
);
 
 
     /**
      * Generate a build for the index
-     * @return
      */
     IndexServiceRequestBuilder getBuilder();
 
+
+    /**
+     * Get the status of a job
+     * @param jobId The jobId returned during the rebuild index
+     * @return
+     */
+    IndexResponse getStatus( final String jobId );
+
+
     /**
      * The response when requesting a re-index operation
      */
     class IndexResponse {
-        final String cursor;
-        final ConnectableObservable<EdgeScope> indexedEdgecount;
+        final String jobId;
+        final String status;
+        final long numberProcessed;
+        final long lastUpdated;
+
+
+        public IndexResponse( final String jobId, final String status, final long numberProcessed,
+                              final long lastUpdated ) {
+            this.jobId = jobId;
+            this.status = status;
+            this.numberProcessed = numberProcessed;
+            this.lastUpdated = lastUpdated;
+        }
 
 
-        public IndexResponse( final String cursor, final ConnectableObservable<EdgeScope>
indexedEdgecount ) {
-            this.cursor = cursor;
-            this.indexedEdgecount = indexedEdgecount;
+        /**
+         * Get the jobId used to resume this operation
+         */
+        public String getJobId() {
+            return jobId;
+        }
+
+
+        /**
+         * Get the last updated time, as a long
+         * @return
+         */
+        public long getLastUpdated() {
+            return lastUpdated;
         }
 
 
         /**
-         * Get the cursor used to resume this operation
+         * Get the number of records processed
          * @return
          */
-        public String getCursor() {
-            return cursor;
+        public long getNumberProcessed() {
+            return numberProcessed;
         }
 
 
         /**
-         * Return the observable of all edges to be indexed.
-         *
-         * Note that after subscribing "connect" will need to be called to ensure that processing
begins
+         * Get the status
          * @return
          */
-        public ConnectableObservable<EdgeScope> getCount() {
-            return indexedEdgecount;
+        public String getStatus() {
+            return status;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index a2fa09a..d828fc2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -20,29 +20,32 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.concurrent.TimeUnit;
+import java.util.List;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
+import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
 import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.StringUtils;
+import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
-import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
 
 
 @Singleton
@@ -51,14 +54,18 @@ public class ReIndexServiceImpl implements ReIndexService {
     private static final MapScope RESUME_MAP_SCOPE =
         new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
 
-    //Keep cursors to resume re-index for 1 day.  This is far beyond it's useful real world
implications anyway.
+    //Keep cursors to resume re-index for 10 days.  This is far beyond it's useful real world
implications anyway.
     private static final int INDEX_TTL = 60 * 60 * 24 * 10;
 
+    private static final String MAP_CURSOR_KEY = "cursor";
+    private static final String MAP_COUNT_KEY = "count";
+    private static final String MAP_STATUS_KEY = "status";
+    private static final String MAP_UPDATED_KEY = "lastUpdated";
+
 
     private final AllApplicationsObservable allApplicationsObservable;
     private final AllEntityIdsObservable allEntityIdsObservable;
     private final IndexProcessorFig indexProcessorFig;
-    private final RxTaskScheduler rxTaskScheduler;
     private final MapManager mapManager;
     private final AsyncEventService indexService;
 
@@ -66,69 +73,61 @@ public class ReIndexServiceImpl implements ReIndexService {
     @Inject
     public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable,
                                final MapManagerFactory mapManagerFactory,
-                               final AllApplicationsObservable allApplicationsObservable,
final IndexProcessorFig indexProcessorFig,
-                               final RxTaskScheduler rxTaskScheduler, final AsyncEventService
indexService ) {
+                               final AllApplicationsObservable allApplicationsObservable,
+                               final IndexProcessorFig indexProcessorFig, final AsyncEventService
indexService ) {
         this.allEntityIdsObservable = allEntityIdsObservable;
         this.allApplicationsObservable = allApplicationsObservable;
         this.indexProcessorFig = indexProcessorFig;
-        this.rxTaskScheduler = rxTaskScheduler;
         this.indexService = indexService;
 
         this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
     }
 
 
-
-
-
     @Override
     public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder
) {
 
-          //load our last emitted Scope if a cursor is present
-        if ( indexServiceRequestBuilder.getCursor().isPresent() ) {
-            throw new UnsupportedOperationException( "Build this" );
-        }
+        //load our last emitted Scope if a cursor is present
 
+        final Optional<EdgeScope> cursor = parseCursor( indexServiceRequestBuilder.getCursor()
);
+
+
+        final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
 
         final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
-        final Observable<ApplicationScope>  applicationScopes = appId.isPresent()?
Observable.just( appId.get() ) : allApplicationsObservable.getData();
 
 
+        Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(),
+            "You cannot specify an app id and a cursor.  When resuming with cursor you must
omit the appid" );
+
+        final Observable<ApplicationScope> applicationScopes = getApplications( cursor,
appId );
 
 
-        final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+        final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
 
         final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE
);
 
         //create an observable that loads each entity and indexes it, start it running with
publish
-        final ConnectableObservable<EdgeScope> runningReIndex =
-            allEntityIdsObservable.getEdgesToEntities( applicationScopes,
-                indexServiceRequestBuilder.getCollectionName() )
-
-                //for each edge, create our scope and index on it
-                .doOnNext( edge -> indexService.index(
-                    new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
-                        modifiedSince ) ) ).publish();
+        final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities(
applicationScopes,
+            indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
 
+            //for each edge, create our scope and index on it
+            .doOnNext( edge -> indexService.index(
+                new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
+                    modifiedSince ) ) );
 
 
         //start our sampler and state persistence
         //take a sample every sample interval to allow us to resume state with minimal loss
-        runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
-            rxTaskScheduler.getAsyncIOScheduler() )
-            .doOnNext( edge -> {
-
-//                final String serializedState = SerializableMapper.asString( edge );
-//
-//                mapManager.putString( newCursor, serializedState, INDEX_TTL );
-            } ).subscribe();
+        runningReIndex.buffer( indexProcessorFig.getUpdateInterval() )
+            //create our flushing collector and flush the edge scopes to it
+            .collect( () -> new FlushingCollector( jobId ),
+                ( ( flushingCollector, edgeScopes ) -> flushingCollector.flushBuffer(
edgeScopes ) ) ).doOnNext( flushingCollector-> flushingCollector.complete() )
+                //subscribe on our I/O scheduler and run the task
+            .subscribeOn( Schedulers.io() ).subscribe();
 
 
-        //start pushing to both
-        runningReIndex.connect();
-
-
-        return new IndexResponse( newCursor, runningReIndex );
+        return new IndexResponse( jobId, "Started", 0, 0 );
     }
 
 
@@ -136,6 +135,155 @@ public class ReIndexServiceImpl implements ReIndexService {
     public IndexServiceRequestBuilder getBuilder() {
         return new IndexServiceRequestBuilderImpl();
     }
+
+
+    @Override
+    public IndexResponse getStatus( final String jobId ) {
+        Preconditions.checkNotNull( jobId, "jobId must not be null" );
+        return getIndexResponse( jobId );
+    }
+
+
+    /**
+     * Simple collector that counts state, then flushed every time a buffer is provided.
 Writes final state when complete
+     */
+    private class FlushingCollector {
+
+        private final String jobId;
+        private long count;
+
+
+        private FlushingCollector( final String jobId ) {
+            this.jobId = jobId;
+        }
+
+
+        public void flushBuffer( final List<EdgeScope> buffer ) {
+            count += buffer.size();
+
+            //write our cursor state
+            if ( buffer.size() > 0 ) {
+                writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
+            }
+
+            writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() );
+        }
+
+        public void complete(){
+            writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() );
+        }
+    }
+
+
+    /**
+     * Get the resume edge scope
+     *
+     * @param edgeScope The optional edge scope from the cursor
+     */
+    private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope
) {
+
+
+        if ( edgeScope.isPresent() ) {
+            return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) );
+        }
+
+        return new CursorSeek<>( Optional.absent() );
+    }
+
+
+    /**
+     * Generate an observable for our appliation scope
+     */
+    private Observable<ApplicationScope> getApplications( final Optional<EdgeScope>
cursor,
+                                                          final Optional<ApplicationScope>
appId ) {
+        //cursor is present use it and skip until we hit that app
+        if ( cursor.isPresent() ) {
+
+            final EdgeScope cursorValue = cursor.get();
+            //we have a cursor and an application scope that was used.
+            return allApplicationsObservable.getData().skipWhile(
+                applicationScope -> !cursorValue.getApplicationScope().equals( applicationScope
) );
+        }
+        //this is intentional.  If
+        else if ( appId.isPresent() ) {
+            return Observable.just( appId.get() );
+        }
+
+        return allApplicationsObservable.getData();
+    }
+
+
+    /**
+     * Swap our cursor for an optional edgescope
+     */
+    private Optional<EdgeScope> parseCursor( final Optional<String> cursor )
{
+
+        if ( !cursor.isPresent() ) {
+            return Optional.absent();
+        }
+
+        //get our cursor
+        final String persistedCursor = mapManager.getString( cursor.get() );
+
+        if ( persistedCursor == null ) {
+            return Optional.absent();
+        }
+
+        final JsonNode node = CursorSerializerUtil.fromString( persistedCursor );
+
+        final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper()
);
+
+        return Optional.of( edgeScope );
+    }
+
+
+    /**
+     * Write the cursor state to the map in cassandra
+     */
+    private void writeCursorState( final String jobId, final EdgeScope edge ) {
+
+        final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(),
edge );
+
+        final String serializedState = CursorSerializerUtil.asString( node );
+
+        mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, INDEX_TTL );
+    }
+
+
+    /**
+     * Write our state meta data into cassandra so everyone can see it
+     * @param jobId
+     * @param status
+     * @param processedCount
+     * @param lastUpdated
+     */
+    private void writeStateMeta( final String jobId, final String status, final long processedCount,
+                                 final long lastUpdated ) {
+
+        mapManager.putString( jobId + MAP_STATUS_KEY, status );
+        mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+        mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+    }
+
+
+    /**
+     * Get the index response from the jobId
+     * @param jobId
+     * @return
+     */
+    private IndexResponse getIndexResponse( final String jobId ) {
+
+        final String status = mapManager.getString( jobId+MAP_STATUS_KEY );
+
+        if(status == null){
+            throw new IllegalArgumentException( "Could not find a job with id " + jobId );
+        }
+
+        final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
+        final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
+
+        return new IndexResponse( jobId, status, processedCount, lastUpdated );
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
index fea0364..7acdd00 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
@@ -20,10 +20,15 @@
 package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
-import com.fasterxml.jackson.core.Base64Variant;
-import com.fasterxml.jackson.core.Base64Variants;
+import java.io.IOException;
+import java.util.Base64;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Preconditions;
 
 
 /**
@@ -35,9 +40,54 @@ public class CursorSerializerUtil {
 
     private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY );
 
+    /**
+     * Aritrary number, just meant to keep us from having a DOS issue
+     */
+    private static final int MAX_SIZE = 1024;
+
 
     public static ObjectMapper getMapper() {
         return MAPPER;
     }
 
+
+    /**
+     * Turn the json node in to a base64 encoded SMILE binary
+     */
+    public static String asString( final JsonNode node ) {
+        final byte[] output;
+        try {
+            output = MAPPER.writeValueAsBytes( node );
+        }
+        catch ( JsonProcessingException e ) {
+            throw new RuntimeException( "Unable to create output from json node " + node
);
+        }
+
+        //generate a base64 url save string
+        final String value = Base64.getUrlEncoder().encodeToString( output );
+
+        return value;
+    }
+
+
+    /**
+     * Parse the base64 encoded binary string
+     */
+    public static JsonNode fromString( final String base64EncodedJson ) {
+
+        Preconditions.checkArgument( base64EncodedJson.length() <= MAX_SIZE,
+            "Your cursor must be less than " + MAX_SIZE + " chars in length" );
+
+        final byte[] data = Base64.getUrlDecoder().decode( base64EncodedJson );
+
+        JsonNode jsonNode;
+        try {
+            jsonNode =  MAPPER.readTree( data );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to parse json node from string " + base64EncodedJson
);
+        }
+
+        return jsonNode;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
index 870edbb..dc6ae71 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
@@ -37,10 +37,6 @@ import com.google.common.base.Preconditions;
  */
 public class RequestCursor {
 
-    /**
-     * Aritrary number, just meant to keep us from having a DOS issue
-     */
-    private static final int MAX_SIZE = 1024;
 
     private static final int MAX_CURSOR_COUNT = 100;
 
@@ -83,11 +79,8 @@ public class RequestCursor {
         try {
 
 
-            Preconditions.checkArgument( cursor.length() <= MAX_SIZE, "Your cursor must
be less than " + MAX_SIZE + " chars in length");
-
-            final byte[] data = Base64.getUrlDecoder().decode( cursor );
 
-            JsonNode jsonNode = MAPPER.readTree( data );
+            JsonNode jsonNode = CursorSerializerUtil.fromString( cursor );
 
 
             Preconditions

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index dbd8b88..dc4bf39 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -20,11 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
-import java.util.Base64;
-
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -53,8 +50,8 @@ public class ResponseCursor {
 
 
     /**
-     * Lazyily encoded deliberately.  If the user doesn't care about a cursor and is using
streams, we dont' want to take the
-     * time to calculate it
+     * Lazyily encoded deliberately.  If the user doesn't care about a cursor and is using
streams, we dont' want to
+     * take the time to calculate it
      */
     public Optional<String> encodeAsString() {
 
@@ -68,42 +65,34 @@ public class ResponseCursor {
             return encodedValue;
         }
 
+        //no edge path, short circuit
 
-        try {
-
-            //no edge path, short circuit
-
-            final ObjectNode map = MAPPER.createObjectNode();
+        final ObjectNode map = MAPPER.createObjectNode();
 
 
-            Optional<EdgePath> current = edgePath;
+        Optional<EdgePath> current = edgePath;
 
 
-            //traverse each edge and add them to our json
-            do {
+        //traverse each edge and add them to our json
+        do {
 
-                final EdgePath edgePath = current.get();
-                final Object cursorValue = edgePath.getCursorValue();
-                final CursorSerializer serializer = edgePath.getSerializer();
-                final int filterId = edgePath.getFilterId();
+            final EdgePath edgePath = current.get();
+            final Object cursorValue = edgePath.getCursorValue();
+            final CursorSerializer serializer = edgePath.getSerializer();
+            final int filterId = edgePath.getFilterId();
 
-                final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
-                map.put( String.valueOf( filterId ), serialized );
+            final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
+            map.put( String.valueOf( filterId ), serialized );
 
-                current = current.get().getPrevious();
-            }
-            while ( current.isPresent() );
+            current = current.get().getPrevious();
+        }
+        while ( current.isPresent() );
 
-            final byte[] output = MAPPER.writeValueAsBytes( map );
+        //generate a base64 url save string
+        final String value = CursorSerializerUtil.asString( map );
 
-            //generate a base64 url save string
-            final String value = Base64.getUrlEncoder().encodeToString( output );
+        encodedValue = Optional.of( value );
 
-            encodedValue =  Optional.of( value );
-        }
-        catch ( JsonProcessingException e ) {
-            throw new CursorParseException( "Unable to serialize cursor", e );
-        }
 
         return encodedValue;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
index c68dc4a..0f9ac9b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -76,34 +76,4 @@ public abstract class AbstractPathFilter<T, R, C extends Serializable>
extends A
      * Return the class to be used when parsing the cursor
      */
     protected abstract CursorSerializer<C> getCursorSerializer();
-
-
-    /**
-     * An internal class that holds a mutable state.  When resuming, we only ever honor the
seek value on the first call.  Afterwards, we will seek from the beginning on newly emitted
values.
-     * Calling get will return the first value to seek, or absent if not specified.  Subsequent
calls will return absent.  Callers should treat the results as seek values for each operation
-     */
-    protected static class CursorSeek<C> {
-
-        private Optional<C> seek;
-
-        private CursorSeek(final Optional<C> cursorValue){
-            seek = cursorValue;
-        }
-
-
-        /**
-         * Get the seek value to use when searching
-         * @return
-         */
-        public Optional<C> getSeekValue(){
-            final Optional<C> toReturn = seek;
-
-            seek = Optional.absent();
-
-            return toReturn;
-        }
-
-
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
new file mode 100644
index 0000000..b803658
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import com.google.common.base.Optional;
+
+
+/**
+ * An internal class that holds a mutable state.  When resuming, we only ever honor the seek
value on the first call.  Afterwards, we will seek from the beginning on newly emitted values.
+ * Calling get will return the first value to seek, or absent if not specified.  Subsequent
calls will return absent.  Callers should treat the results as seek values for each operation
+ */
+public class CursorSeek<C> {
+
+    private Optional<C> seek;
+
+    public CursorSeek( final Optional<C> cursorValue ){
+        seek = cursorValue;
+    }
+
+
+    /**
+     * Get the seek value to use when searching
+     * @return
+     */
+    public Optional<C> getSeekValue(){
+        final Optional<C> toReturn = seek;
+
+        seek = Optional.absent();
+
+        return toReturn;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index aada240..9070609 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -24,6 +24,7 @@ import  com.google.common.base.Optional;
 
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
 
 import rx.Observable;
 
@@ -44,8 +45,9 @@ public interface AllEntityIdsObservable {
      * Get all edges that represent edges to entities in the system
      * @param appScopes
      * @param edgeType The edge type to use (if specified)
+     * @param lastEdge The edge to resume processing from
      * @return
      */
-    Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope>
appScopes, final Optional<String> edgeType);
+    Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope>
appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 6a95e7b..0420a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -28,6 +28,7 @@ import com.google.inject.Singleton;
 
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
@@ -81,12 +82,12 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable
{
 
 
     @Override
-    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope>
appScopes, final Optional<String> edgeType) {
+    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope>
appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) {
 
         return appScopes.flatMap( applicationScope -> {
             final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope
);
 
-            return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(),
edgeType )
+            return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(),
edgeType, lastEdge )
                                   .map( edge -> new EdgeScope(applicationScope, edge ));
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index a17c925..cb9919f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Optional;
 import org.apache.commons.lang.RandomStringUtils;
 
+import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.junit.After;
@@ -196,7 +197,9 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT
{
 
         try {
 
-            reIndexService.rebuildIndex( Optional.of( em.getApplicationId()), Optional.<String>of("catherders"),
Optional.absent(), Optional.absent() );
+            final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId(
em.getApplicationId() ).withCollection( "catherders" );
+
+            reIndexService.rebuildIndex(builder );
 
             reporter.report();
             registry.remove( meterName );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 964e13d..78a1d4b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -42,16 +42,6 @@ public interface EdgesObservable {
 
 
     /**
-     * Return an observable of all edges from a source node.  Ordered ascending, from the
startTimestamp if specified
-     * @param gm
-     * @param sourceNode
-     * @param edgeType The edge type if specified.  Otherwise all types will be used
-     * @return
-     */
-    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
-                                                final Optional<String> edgeType );
-
-    /**
      * Get all edges from the source node with the target type
      * @param gm
      * @param sourceNode
@@ -67,4 +57,15 @@ public interface EdgesObservable {
      * @return
      */
     Observable<Edge> edgesToTarget(final GraphManager gm,  final Id targetNode);
+
+    /**
+     * Return an observable of all edges from a source node.  Ordered ascending, from the
startTimestamp if specified
+     * @param gm
+     * @param sourceNode
+     * @param edgeType The edge type if specified.  Otherwise all types will be used
+     * @param resume The edge to start seeking after.  Otherwise starts at the most recent
+     * @return
+     */
+    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+                                                final Optional<String> edgeType, final
Optional<Edge> resume );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 7240798..18274ac 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -72,7 +72,7 @@ public class EdgesObservableImpl implements EdgesObservable {
 
     @Override
     public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final
Id sourceNode,
-                                                       final Optional<String> edgeTypeInput
) {
+                                                       final Optional<String> edgeTypeInput,
final Optional<Edge> resume  ) {
 
 
 
@@ -86,7 +86,7 @@ public class EdgesObservableImpl implements EdgesObservable {
 
                 return gm.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        Optional.<Edge>absent() ) );
+                       resume ) );
         } );
     }
 


Mime
View raw message