usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [5/5] incubator-usergrid git commit: WIP, overwrite
Date Wed, 01 Apr 2015 15:27:48 GMT
WIP, overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cee76da7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cee76da7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cee76da7

Branch: refs/heads/USERGRID-528
Commit: cee76da72c3343d562867988f76f264204d54b59
Parents: 61aa979
Author: Todd Nine <tnine@apigee.com>
Authored: Tue Mar 24 14:15:49 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Tue Mar 24 14:15:49 2015 -0600

----------------------------------------------------------------------
 .../io/read/EntityIndexCommand.java             | 404 ++++++++++++-------
 .../corepersistence/rx/impl/CollectUntil.java   |   2 +-
 2 files changed, 270 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cee76da7/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
index a8fa221..464dd45 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
@@ -22,8 +22,9 @@ package org.apache.usergrid.corepersistence.io.read;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -32,8 +33,8 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -52,32 +53,34 @@ import rx.functions.Func0;
 import rx.functions.Func1;
 
 
+/**
+ * Performs a search of the given type along the specified edgeName.  It then loads and validates
results,
+ * and returns an Observable of SearchResults
+ */
 @Singleton
 public class EntityIndexCommand implements Command<Id, EntityIndexCommand.SearchResults>
{
 
     private final Id applicationId;
-    private final ApplicationScope applicationScope;
     private final ApplicationEntityIndex index;
     private final SearchTypes types;
     private final String query;
     private final int resultSetSize;
-    private final String scopeType;
+    private final String edgeName;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
 
 
     @Inject
-    public EntityIndexCommand( final Id applicationId, final ApplicationScope applicationScope,
-                               final ApplicationEntityIndex index, final SearchTypes types,
final String query,
-                               final int resultSetSize, final String scopeType,
+    public EntityIndexCommand( final Id applicationId, final ApplicationEntityIndex index,
final SearchTypes types,
+                               final String query, final int resultSetSize, final String
edgeName,
                                final EntityCollectionManagerFactory entityCollectionManagerFactory
) {
         this.applicationId = applicationId;
-        this.applicationScope = applicationScope;
+
 
         this.index = index;
         this.types = types;
         this.query = query;
         this.resultSetSize = resultSetSize;
-        this.scopeType = scopeType;
+        this.edgeName = edgeName;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
     }
 
@@ -85,150 +88,216 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search
     @Override
     public Observable<EntityIndexCommand.SearchResults> call( final Observable<Id>
idObservable ) {
 
-        //create our observable of candidate search results
-        final Observable<CandidateResults> candidateResults = idObservable
-            .flatMap( id -> Observable.create( new ElasticSearchObservable( initialSearch(
id ), nextPage( id ) ) ) );
-
-        final Observable<CandidateResult> candidateObservable =
-            candidateResults.flatMap( candidates -> Observable.from( candidates ) );
-
-        //since we'll only have at most 100 results in memory at a time, we roll up our groups
and emit them on to
-        // the collector
-        final Observable<CandidateGroup> candidateGroup =
-            candidateObservable.groupBy( candidate -> candidate.getId() ).map( observableGroup
-> {
-
-
-                //for each group, create a list, then sort by highest version first
-                final List<CandidateResult> groupList = observableGroup.toList().toBlocking().last();
-
-                Collections.sort( groupList, CandidateVersionComparator::compare );
-
-                //create our candidate group and emit it
-                final CandidateGroup group =
-                    new CandidateGroup( groupList.get( 0 ), groupList.subList( 1, groupList.size()
) );
-
-                return group;
-            } );
-
+        //load all the potential candidates
+        final Observable<CandidateCollector> collectedCandidates = idObservable.compose(
createCandidates() );
 
-        //buffer our candidate group up to our resultset size.
-        final Observable<CandidateCollector> collectedCandidates =
-            candidateGroup.buffer( resultSetSize ).flatMap( candidates -> {
+        //now we have our collected candidates, load them from ES
+        final Observable<SearchResults> loadedEntities = collectedCandidates.map( loadEntities(
resultSetSize ) );
 
-                final Observable<CandidateCollector> collector = Observable.from( candidates
).collect(
-                    () -> new CandidateCollector( resultSetSize ), ( candidateCollector,
candidate ) -> {
-                        //add our candidates to our collector
-                        candidateCollector.addCandidate( candidate.toKeep );
-                        //add our empty results
-                        candidateCollector.addEmptyResults( candidate.toRemove );
-                    } );
-
-                return collector;
-            } );
 
-        //now we have our collected candidates, load them
+        //after we've loaded, remove everything that's left
+        loadedEntities.doOnNext( results ->{
 
+            //run the deindex
+            Observable.from( results.toRemove ).collect( () -> index.createBatch(), (batch,
toRemove) -> {
+              batch.deindex( toRemove.indexScope, toRemove.result );
+            }).doOnNext( batch -> batch.execute() ).toBlocking().lastOrDefault( null );
 
-        final Observable<SearchResults> loadedEntities = collectedCandidates.map( loadEntities(resultSetSize)
);
+        });
 
 
         return loadedEntities;
     }
 
 
-    /**
-     * Perform the initial search with the sourceId
-     */
-    private Func0<CandidateResults> initialSearch( final Id sourceId ) {
-        return () -> index.search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL(
query ) );
-    }
 
 
     /**
-     * Search the next page for the specified source
+     * Using an input of observable ids, transform and collect them into a set of CandidateCollector
to be loaded, or
+     * verified by the EM
      */
-    private Func1<String, CandidateResults> nextPage( final Id sourceId ) {
-        return cursor -> index
-            .search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL( query
).withCursor( cursor ) );
-    }
+    private Observable.Transformer<Id, CandidateCollector> createCandidates() {
+        return idObservable -> {
 
+            //create our observable of candidate search results
+            final Observable<ScopedCandidateResults> candidateResults = idObservable.flatMap(
 
-    /**
-     * Function that will load our entities from our candidates, filter stale or missing
candidates and return results
-     * @param expectedSize
-     * @return
-     */
-    private Func1<CandidateCollector, SearchResults> loadEntities(final int expectedSize)
{
-        return candidateCollector -> {
 
-            //from our candidates, group them by id type so we can create scopes
-            Observable.from( candidateCollector.getCandidates() ).groupBy( candidate ->
candidate.getId().getType() )
-                      .flatMap( groups -> {
+                id -> {
 
+                    //create the index scope from the id
+                    final IndexScope scope = createScope( id );
 
-                          final List<CandidateResult> candidates = groups.toList().toBlocking().last();
+                    //perform the ES search of candidate results
+                    final Observable<ScopedCandidateResults> scoped = Observable.create(
+                        new ElasticSearchObservable( initialSearch( scope ), nextPage( scope
) ) );
 
-                          //we can get no results, so quit aggregating if there are none
-                          if ( candidates.size() == 0 ) {
-                              return Observable.just( new SearchResults( 0 ) );
-                          }
 
+                    final Observable<CandidateCollector> collectedResults =  scoped.flatMap(
scopedCandidateResults -> {
 
-                          final String typeName = candidates.get( 0 ).getId().getType();
+                        //here b/c the compiler cannot infer type directly
+                        final Observable<CandidateResult> candidates =
+                            Observable.from( scopedCandidateResults.candidateResults );
 
-                          final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
typeName );
+                        return candidates.collect( () -> new CandidateCollector( resultSetSize,
+                            scopedCandidateResults.candidateResults.getCursor() ), ( collector,
candidate ) -> {
+                            collector.insert( candidate );
+                        } );
+                    } );
 
 
-                          //create our scope to get our entity manager
 
-                          final CollectionScope scope =
-                              new CollectionScopeImpl( applicationId, applicationId, collectionType
);
+                    //group our candidates by type
+//                   collectedResults.flatMap( collected -> Observable.from( collected.candidates
) ).groupBy( candidate -> candidate.getId().getType() ).co
 
-                          final EntityCollectionManager ecm =
-                              entityCollectionManagerFactory.createCollectionManager( scope
);
 
+                    final Observable<SearchResults> resultsObservable = collectedResults.map(
collectedCandidates -> {
 
-                          //get our entity ids
-                          final List<Id> entityIds =
-                              Observable.from( candidates ).map( c -> c.getId() ).toList().toBlocking().last();
 
-                          //TODO, change this out
+                        //we can get no results, so quit aggregating if there are none
+                        if ( collectedCandidates.candidates.size() == 0 ) {
+                            return new SearchResults( 0, collectedCandidates.cursor );
+                        }
 
-                          //an observable of our entity set
 
+                        final String collectionType =
+                            CpNamingUtils.getCollectionScopeNameFromEntityType( id.getType()
);
 
 
-                          //now go through all our candidates and verify
+                        //create our scope to get our entity manager
 
-                          return Observable.from( candidates ).collect(  () -> new SearchResults(
expectedSize ), (searchResults, candidate) ->{
+                        final CollectionScope collectionScope =
+                            new CollectionScopeImpl( applicationId, applicationId, collectionType
);
 
-                              final EntitySet entitySet = ecm.load( entityIds ).toBlocking().last();
+                        final EntityCollectionManager ecm =
+                            entityCollectionManagerFactory.createCollectionManager( collectionScope
);
 
-                              final MvccEntity entity = entitySet.getEntity( candidate.getId()
);
 
+                        //get our entity ids
+                        final List<Id> entityIds =
+                            Observable.from( collectedCandidates.candidates ).map( c ->
c.getId() ).toList()
+                                      .toBlocking().last();
 
-                              //our candidate it stale, or target entity was deleted add
it to the remove of our collector
-                              if(UUIDComparator.staticCompare( entity.getVersion(), candidate.getVersion())
> 0 || !entity.getEntity().isPresent()){
-                                  searchResults.addToRemove( candidate );
-                                  return;
-                              }
+                        //TODO, change this out
 
+                        //an observable of our entity set
 
-                              searchResults.addEntity( entity.getEntity().get() );
+                        return ecm.load( entityIds ).map( results -> {
 
+                            Observable.from( entityIds ).collect( () -> new SearchResults(
resultSetSize ),
+                                ( searchResults, entityIds ) -> {
+                                } );
+                        } );
+                    } );
 
-                          } )
-                              //add the existing set to remove to this set
-                              .doOnNext( results -> results.addToRemove( candidateCollector.getToRemove()
) );
+                    return resultsObservable;
+                } );
 
-                      } );
+            return candidateResults;
+        };
+    }
 
 
-            return null;
-        };
+    /**
+     * Perform the initial search with the sourceId
+     */
+    private Func0<ScopedCandidateResults> initialSearch( final IndexScope scope ) {
+        return () -> new ScopedCandidateResults(scope,  index.search(  scope, types, Query.fromQL(
query ) ) );
+    }
+
+
+    /**
+     * Search the next page for the specified source
+     */
+    private Func1<String, ScopedCandidateResults> nextPage( final IndexScope scope
 ) {
+        return cursor -> new ScopedCandidateResults(scope,   index.search( scope, types,
Query.fromQL( query ).withCursor(
+            cursor ) ) );
     }
 
 
+    /**
+     * Create the scope and return it
+     * @param sourceId
+     * @return
+     */
+    private IndexScope createScope(final Id sourceId){
+        return new IndexScopeImpl( sourceId, edgeName );
+    }
+
+//
+//    /**
+//     * Function that will load our entities from our candidates, filter stale or missing
candidates and return results
+//     */
+//    private Func1<CandidateCollector, SearchResults> loadEntities( final int expectedSize
) {
+//        return candidateCollector -> {
+//
+//            //from our candidates, group them by id type so we can create scopes
+//            Observable.from( candidateCollector.getCandidates() ).groupBy( candidate ->
candidate.result.getId().getType() )
+//                      .flatMap( groups -> {
+//
+//
+//                          final List<ScopedCandidateResult> candidates = groups.toList().toBlocking().last();
+//
+//                          //we can get no results, so quit aggregating if there are none
+//                          if ( candidates.size() == 0 ) {
+//                              return Observable.just( new SearchResults(  0, null ) );
+//                          }
+//
+//
+//                          final String typeName = candidates.get( 0 ).result.getId().getType();
+//
+//                          final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
typeName );
+//
+//
+//                          //create our scope to get our entity manager
+//
+//                          final CollectionScope scope =
+//                              new CollectionScopeImpl( applicationId, applicationId, collectionType
);
+//
+//                          final EntityCollectionManager ecm =
+//                              entityCollectionManagerFactory.createCollectionManager( scope
);
+//
+//
+//                          //get our entity ids
+//                          final List<Id> entityIds =
+//                              Observable.from( candidates ).map( c -> c.result.getId()
).toList().toBlocking().last();
+//
+//                          //TODO, change this out
+//
+//                          //an observable of our entity set
+//
+//
+//                          //now go through all our candidates and verify
+//
+//                          return Observable.from( candidates ).collect( () -> new SearchResults(
+//                                  expectedSize ),
+//                              ( searchResults, candidate ) -> {
+//
+//                                  final EntitySet entitySet = ecm.load( entityIds ).toBlocking().last();
+//
+//                                  final MvccEntity entity = entitySet.getEntity( candidate.result.getId()
);
+//
+//
+//                                  //our candidate it stale, or target entity was deleted
add it to the remove of our
+//                                  // collector
+//                                  if ( UUIDComparator.staticCompare( entity.getVersion(),
candidate.result.getVersion() ) > 0
+//                                      || !entity.getEntity().isPresent() ) {
+//
+//                                      searchResults.addToRemove( candidate  );
+//                                      return;
+//                                  }
+//
+//
+//                                  searchResults.addEntity( entity.getEntity().get() );
+//                              } )
+//                              //add the existing set to remove to this set
+//                              .doOnNext( results -> results.addToRemove( candidateCollector.getToRemove()
) );
+//                      } );
+//
+//
+//            return null;
+//        };
+//    }
 
 
     /**
@@ -238,20 +307,49 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search
         private final List<CandidateResult> candidates;
         private final List<CandidateResult> toRemove;
 
+        private final Map<Id, Integer> indexMapping;
 
-        public CandidateCollector( final int maxSize ) {
+        private final String cursor;
+
+
+        public CandidateCollector( final int maxSize, final String cursor ) {
+            this.cursor = cursor;
             candidates = new ArrayList<>( maxSize );
             toRemove = new ArrayList<>( maxSize );
+            indexMapping = new HashMap<>(maxSize);
         }
 
 
-        public void addCandidate( final CandidateResult candidate ) {
-            this.candidates.add( candidate );
-        }
 
+        public void insert(final CandidateResult newValue){
+
+            final Id candidateId = newValue.getId();
+
+            final Integer index = indexMapping.get( candidateId  );
+
+            if(index == null){
+                candidates.add( newValue );
+                indexMapping.put( candidateId, candidates.size()-1);
+                return;
+            }
+
+            //present, perform a comparison
+
+            final CandidateResult existing = candidates.get( index );
+
+
+            //it's a greater version, add this to ignore
+            if(UUIDComparator.staticCompare( existing.getVersion(), newValue.getVersion()
) > 0){
+                toRemove.add( newValue );
+            }
+
+            //remove the stale version from the list and put it in deindex
+            else{
+                candidates.remove( index );
+                candidates.add( newValue );
+                toRemove.add( existing );
+            }
 
-        public void addEmptyResults( final Collection<CandidateResult> stale ) {
-            this.toRemove.addAll( stale );
         }
 
 
@@ -268,13 +366,16 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search
 
     public static class SearchResults {
         private final List<Entity> entities;
-        private final List<CandidateResult> toRemove;
+        private final List<ScopedCandidateResult> toRemove;
+        private final int maxSize;
 
-        private String cursor;
+        private final String cursor;
 
 
-        public SearchResults( final int maxSize ) {
-            entities = new ArrayList<>( maxSize );
+        public SearchResults( final int maxSize, final String cursor ) {
+            this.maxSize = maxSize;
+            this.cursor = cursor;
+            this.entities = new ArrayList<>( maxSize );
             this.toRemove = new ArrayList<>( maxSize );
         }
 
@@ -284,20 +385,23 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search
         }
 
 
-        public void addToRemove( final Collection<CandidateResult> stale ) {
+        public void addToRemove( final Collection<ScopedCandidateResult> stale ) {
             this.toRemove.addAll( stale );
         }
 
 
-        public void addToRemove( final CandidateResult candidateResult ) {
-                   this.toRemove.add( candidateResult );
-               }
+        public void addToRemove( final ScopedCandidateResult candidateResult ) {
+            this.toRemove.add( candidateResult );
+        }
 
 
+        public String getCursor() {
+            return cursor;
+        }
 
 
-        public void setCursor( final String cursor ) {
-            this.cursor = cursor;
+        public boolean isFull(){
+            return entities.size() >= maxSize;
         }
     }
 
@@ -305,26 +409,26 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search
     /**
      * An observable that will perform a search and continually emit results while they exist.
      */
-    public static class ElasticSearchObservable implements Observable.OnSubscribe<CandidateResults>
{
+    public static class ElasticSearchObservable implements Observable.OnSubscribe<ScopedCandidateResults>
{
 
-        private final Func1<String, CandidateResults> fetchNextPage;
-        private final Func0<CandidateResults> fetchInitialResults;
+        private final Func1<String, ScopedCandidateResults> fetchNextPage;
+        private final Func0<ScopedCandidateResults> fetchInitialResults;
 
 
-        public ElasticSearchObservable( final Func0<CandidateResults> fetchInitialResults,
-                                        final Func1<String, CandidateResults> fetchNextPage
) {
+        public ElasticSearchObservable( final Func0<ScopedCandidateResults> fetchInitialResults,
+                                        final Func1<String, ScopedCandidateResults>
fetchNextPage ) {
             this.fetchInitialResults = fetchInitialResults;
             this.fetchNextPage = fetchNextPage;
         }
 
 
         @Override
-        public void call( final Subscriber<? super CandidateResults> subscriber ) {
+        public void call( final Subscriber<? super ScopedCandidateResults> subscriber
) {
 
             subscriber.onStart();
 
             try {
-                CandidateResults results = fetchInitialResults.call();
+                ScopedCandidateResults results = fetchInitialResults.call();
 
 
                 //emit our next page
@@ -332,13 +436,13 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search
                     subscriber.onNext( results );
 
                     //if we have no cursor, we're done
-                    if ( !results.hasCursor() ) {
+                    if ( !results.candidateResults.hasCursor() ) {
                         break;
                     }
 
 
                     //we have a cursor, get our results to emit for the next page
-                    results = fetchNextPage.call( results.getCursor() );
+                    results = fetchNextPage.call( results.candidateResults.getCursor() );
                 }
 
                 subscriber.onCompleted();
@@ -351,16 +455,51 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search
 
 
     /**
-     * A message that contains the candidate to keep, and the candidate toRemove
+     * A candidate result, aslong with the scope it was returned in
+     */
+    public static class ScopedCandidateResult{
+        private final IndexScope indexScope;
+        private final CandidateResult result;
+
+
+        public ScopedCandidateResult( final IndexScope indexScope, final CandidateResult
result ) {
+            this.indexScope = indexScope;
+            this.result = result;
+        }
+    }
+
+
+    /**
+     * Object that represents our candidate results, along with the scope it was searched
in
+     */
+    public static class ScopedCandidateResults {
+        private final IndexScope indexScope;
+        private CandidateResults candidateResults;
+
+
+        public ScopedCandidateResults( final IndexScope indexScope, final CandidateResults
candidateResults ) {
+            this.indexScope = indexScope;
+            this.candidateResults = candidateResults;
+        }
+    }
+
+
+    /**
+     * A message that contains the candidate to keep, and the candidate toRemove and the
scope they were searched in
      */
     public static class CandidateGroup {
+        private final IndexScope indexScope;
         private final CandidateResult toKeep;
         private final Collection<CandidateResult> toRemove;
+        private final String cursor;
 
 
-        public CandidateGroup( final CandidateResult toKeep, final Collection<CandidateResult>
toRemove ) {
+        public CandidateGroup( final IndexScope indexScope, final CandidateResult toKeep,
+                               final Collection<CandidateResult> toRemove, final String
cursor ) {
+            this.indexScope = indexScope;
             this.toKeep = toKeep;
             this.toRemove = toRemove;
+            this.cursor = cursor;
         }
     }
 
@@ -375,10 +514,5 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search
         }
     }
 
-    /***********************
-     * FROM HERE DOWN IS EXPERIMENTAL
-     *************************
-     */
-
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cee76da7/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
index 8a0c014..93de094 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
@@ -38,7 +38,7 @@ public class CollectUntil<T, R> implements Observable.Transformer<T,
R> {
     final Func1<R, Boolean> shortCircuitWhen;
 
 
-    public CollectUntil( final Func1<R, Boolean> shortCircuitWhen,  final Func0<R>
stateFactory, final Action2<R, ? super T> collector) {
+    public CollectUntil(  final Func0<R> stateFactory,final Func1<R, Boolean>
shortCircuitWhen,  final Action2<R, ? super T> collector) {
         this.stateFactory = stateFactory;
         this.collector = collector;
         this.shortCircuitWhen = shortCircuitWhen;


Mime
View raw message