usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [1/2] usergrid git commit: Changes flow so that ack is a future step of the subscription of the flush to ES. This way, if ES fails and throws an exception, the message will not be acked.
Date Thu, 01 Oct 2015 21:59:05 GMT
Repository: usergrid
Updated Branches:
  refs/heads/2.1-release f3d5fb552 -> 8ea74b058


Changes flow so that ack is a future step of the subscription of the flush to ES.  This way,
if ES fails and throws an exception, the message will not be acked.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7cc32c3c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7cc32c3c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7cc32c3c

Branch: refs/heads/2.1-release
Commit: 7cc32c3cb5fc8f055629b65d500b5cd5f292161e
Parents: 62f41d0
Author: Todd Nine <tnine@apigee.com>
Authored: Thu Oct 1 15:58:37 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Oct 1 15:58:37 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 44 ++++++++++----------
 .../index/impl/EsIndexProducerImpl.java         |  9 ++--
 2 files changed, 28 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc32c3c/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index d1670ed..2ba177a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -270,24 +270,32 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         masterObservable
             //remove unsuccessful
-            .filter(indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage().isPresent())
+            .filter( indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage()
+                                                                                       .isPresent()
)
             //take the max
-            .buffer(MAX_TAKE)
+            .buffer( MAX_TAKE )
             //map them to index results and return them
-            .map(indexEventResults -> {
+            .flatMap( indexEventResults -> {
                 IndexOperationMessage combined = new IndexOperationMessage();
-                indexEventResults.stream()
-                    .forEach(indexEventResult -> combined.ingest(indexEventResult.getIndexOperationMessage().get()));
-                indexProducer.put(combined).subscribe();//execute the index operation
-                return indexEventResults;
-            })
+                indexEventResults.stream().forEach(
+                    indexEventResult -> combined.ingest( indexEventResult.getIndexOperationMessage().get()
) );
+
+                //ack after successful completion of the operation.
+                return indexProducer.put( combined ).flatMap(
+                    operationResult -> Observable.from( indexEventResults ) )
+                    //ack each message, but only if we didn't error.  If we did, we'll want
to log it and
+                                    .map( indexEventResult -> {
+                                        ack( indexEventResult.queueMessage );
+                                        return indexEventResult;
+                                    } ).doOnError( error ->
+                    {
+                        logger.error( "Unable to write messages to elasticsearch.  Messages
not acked", error );
+                    });
+            } )
                 //flat map the ops so they are back to individual
-            .flatMap(indexEventResults -> Observable.from(indexEventResults))
-            //ack each message
-            .map(indexEventResult -> {
-                ack(indexEventResult.queueMessage);
-                return indexEventResult;
-            })
+
+            //overwhelms ES
+            .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
             .subscribe();
     }
 
@@ -559,14 +567,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    /**
-     * Subscribes to the observable and acks the message via SQS on completion
-     * @param observable
-     * @param message
-     */
-    private void subscribeAndAck( final Observable<?> observable, final QueueMessage
message ){
-       observable.doOnCompleted( ()-> ack(message)  ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler()
).subscribe();
-    }
     public class IndexEventResult{
         private final QueueMessage queueMessage;
         private final Optional<IndexOperationMessage> indexOperationMessage;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc32c3c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 2b36fc8..869b75a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -178,7 +178,7 @@ public class EsIndexProducerImpl implements IndexProducer {
         try {
             responses = bulkRequest.execute().actionGet( );
         } catch ( Throwable t ) {
-            log.error( "Unable to communicate with elasticsearch" );
+            log.error( "Unable to communicate with elasticsearch", t );
             failureMonitor.fail( "Unable to execute batch", t );
             throw t;
         }finally{
@@ -189,6 +189,8 @@ public class EsIndexProducerImpl implements IndexProducer {
 
         boolean error = false;
 
+        final StringBuilder errorString = new StringBuilder(  );
+
         for ( BulkItemResponse response : responses ) {
 
             if ( response.isFailed() ) {
@@ -197,13 +199,14 @@ public class EsIndexProducerImpl implements IndexProducer {
                     response.getType(), response.getIndex(), response.getFailureMessage()
);
 
                 error = true;
+
+                errorString.append( response.getFailureMessage() ).append( "\n" );
             }
         }
 
         if ( error ) {
             throw new RuntimeException(
-                "Error during processing of bulk index operations one of the responses failed.
 Check previous log "
-                    + "entries" );
+                "Error during processing of bulk index operations one of the responses failed.
\n" + errorString);
         }
     }
 }


Mime
View raw message