Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DCCEB1836A for ; Mon, 5 Oct 2015 22:28:29 +0000 (UTC) Received: (qmail 19843 invoked by uid 500); 5 Oct 2015 22:28:29 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 19821 invoked by uid 500); 5 Oct 2015 22:28:29 -0000 Mailing-List: contact commits-help@usergrid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.apache.org Delivered-To: mailing list commits@usergrid.apache.org Received: (qmail 19812 invoked by uid 99); 5 Oct 2015 22:28:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Oct 2015 22:28:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2B433E0837; Mon, 5 Oct 2015 22:28:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sfeldman@apache.org To: commits@usergrid.apache.org Date: Mon, 05 Oct 2015 22:28:28 -0000 Message-Id: <1cbde156fd584ae0a042d45cd04e8cbb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] usergrid git commit: filter some ops out, check for queue overflow Repository: usergrid Updated Branches: refs/heads/review-observable 5f20ece66 -> 1a1d42e1f filter some ops out, check for queue overflow Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f35d01c7 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f35d01c7 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f35d01c7 Branch: refs/heads/review-observable Commit: f35d01c730998d45ca6feb5edf9138f04d185951 Parents: 5f20ece Author: Shawn Feldman Authored: Mon Oct 5 13:59:10 2015 -0600 Committer: Shawn Feldman Committed: Mon Oct 5 13:59:10 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 31 ++++++++++++-------- .../usergrid/persistence/index/IndexFig.java | 3 ++ .../index/impl/EsIndexProducerImpl.java | 7 +++++ .../queue/impl/QueueManagerFactoryImpl.java | 2 +- 4 files changed, 29 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 92faed4..e215d48 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -239,7 +239,7 @@ public class AmazonAsyncEventService implements AsyncEventService { if (event == null) { logger.error("AsyncEvent type or event is null!"); - return Observable.just(new IndexEventResult(message, Optional.absent(), false)); + return Observable.just(new IndexEventResult(message, Optional.absent(), true)); } try { //merge each operation to a master observable; @@ -254,10 +254,10 @@ public class AmazonAsyncEventService implements AsyncEventService { } else if (event instanceof InitializeApplicationIndexEvent) { //does not return observable handleInitializeApplicationIndex(message); - return Observable.just(new IndexEventResult(message, Optional.absent(), true)); + return Observable.just(new IndexEventResult(message, Optional.absent(), false)); } else { logger.error("Unknown EventType: {}", event); - return Observable.just(new IndexEventResult(message, Optional.absent(), false)); + return Observable.just(new IndexEventResult(message, Optional.absent(), true)); } }catch (Exception e){ logger.error("Failed to index entity", e,message); @@ -270,20 +270,25 @@ public class AmazonAsyncEventService implements AsyncEventService { return masterObservable //remove unsuccessful - .filter( indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage() - .isPresent() ) + .filter( indexEventResult -> indexEventResult.shouldProcess() ) //take the max .buffer( MAX_TAKE ) //map them to index results and return them .flatMap( indexEventResults -> { IndexOperationMessage combined = new IndexOperationMessage(); indexEventResults.stream().forEach( - indexEventResult -> combined.ingest( indexEventResult.getIndexOperationMessage().get() ) ); + indexEventResult ->{ + if(indexEventResult.getIndexOperationMessage().isPresent()) { + combined.ingest(indexEventResult.getIndexOperationMessage().get()); + } + } ); + //ack after successful completion of the operation. - return indexProducer.put( combined ).flatMap( operationResult -> Observable.from( indexEventResults ) ) + return indexProducer.put( combined ) + .flatMap( operationResult -> Observable.from( indexEventResults ) ) //ack each message, but only if we didn't error. If we did, we'll want to log it and - .map( indexEventResult -> { + .map( indexEventResult -> { ack( indexEventResult.queueMessage ); return indexEventResult; } ); @@ -562,21 +567,21 @@ public class AmazonAsyncEventService implements AsyncEventService { public class IndexEventResult{ private final QueueMessage queueMessage; private final Optional indexOperationMessage; - private final boolean success; + private final boolean shouldProcess; - public IndexEventResult(QueueMessage queueMessage, Optional indexOperationMessage ,boolean success){ + public IndexEventResult(QueueMessage queueMessage, Optional indexOperationMessage ,boolean shouldProcess){ this.queueMessage = queueMessage; this.indexOperationMessage = indexOperationMessage; - this.success = success; + this.shouldProcess = shouldProcess; } public QueueMessage getQueueMessage() { return queueMessage; } - public boolean success() { - return success; + public boolean shouldProcess() { + return shouldProcess; } public Optional getIndexOperationMessage() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index 4f35730..e093d7d 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -196,4 +196,7 @@ public interface IndexFig extends GuicyFig { long getWriteTimeout(); + @Default("1000") + @Key( "elasticsearch_queue_error_sleep_ms" ) + long getSleepTimeForQueueError(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java index 869b75a..828027c 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java @@ -205,6 +205,13 @@ public class EsIndexProducerImpl implements IndexProducer { } if ( error ) { + if(errorString.lastIndexOf("rejected execution (queue capacity")>=0){ + try{ + Thread.sleep(indexFig.getSleepTimeForQueueError()); + }catch (InterruptedException ie){ + //move on + } + } throw new RuntimeException( "Error during processing of bulk index operations one of the responses failed. \n" + errorString); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java index d0ed1ef..a1940d0 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java @@ -47,7 +47,7 @@ public class QueueManagerFactoryImpl implements QueueManagerFactory { } @Override public QueueManager getQueueManager(QueueScope scope) { - if(queueFig.overrideQueueForDefault()){ + if(true==false){ QueueManager manager = defaultManager.get(scope.getName()); if(manager==null){ manager = new DefaultQueueManager();