usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/3] incubator-usergrid git commit: Fixes subscription death on error
Date Wed, 12 Aug 2015 21:21:59 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev e3a4a9571 -> d7310c503


Fixes subscription death on error


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3f821f58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3f821f58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3f821f58

Branch: refs/heads/two-dot-o-dev
Commit: 3f821f5803c467b45148c51ba192fd957146dc1e
Parents: e3a4a95
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Aug 12 14:59:46 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed Aug 12 14:59:46 2015 -0600

----------------------------------------------------------------------
 .../index/impl/EsIndexBufferConsumerImpl.java          | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f821f58/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 457a900..ed70c62 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -177,7 +177,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
                     batchOperation.doOperation( client, bulkRequestBuilder );
                 } ) )
                 //write them
-            .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) ).doOnError(
t -> log.error( "Unable to process batches", t ) );
+            .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) );
 
 
         //now that we've processed them all, ack the futures after our last batch comes through
@@ -194,9 +194,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
         //mark this as done
         return processedIndexOperations.doOnNext( processedIndexOp -> {
                 processedIndexOp.done();
-                roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
-            }
-        ).doOnError(t -> log.error("Unable to ack futures", t));
+                roundtripTimer.update( System.currentTimeMillis() - processedIndexOp.getCreationTime()
);
+            } );
     }
 
 
@@ -270,7 +269,11 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
          * Send the data through the buffer
          */
         public void send( final IndexOperationMessage indexOp ) {
-            subscriber.onNext( indexOp );
+            try {
+                subscriber.onNext( indexOp );
+            }catch(Exception e){
+                log.error( "Unable to process message for indexOp {}, error follows.", indexOp,
e );
+            }
         }
 
 


Mime
View raw message