Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 83AE217DE2 for ; Thu, 1 Oct 2015 16:27:46 +0000 (UTC) Received: (qmail 62067 invoked by uid 500); 1 Oct 2015 16:27:37 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 61995 invoked by uid 500); 1 Oct 2015 16:27:37 -0000 Mailing-List: contact commits-help@usergrid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.apache.org Delivered-To: mailing list commits@usergrid.apache.org Received: (qmail 61908 invoked by uid 99); 1 Oct 2015 16:27:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Oct 2015 16:27:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CACA4E1536; Thu, 1 Oct 2015 16:27:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sfeldman@apache.org To: commits@usergrid.apache.org Date: Thu, 01 Oct 2015 16:27:39 -0000 Message-Id: <956c66eab04b4bff9b86042f1ea064a3@git.apache.org> In-Reply-To: <74f2f74819104da085f5c494de06c327@git.apache.org> References: <74f2f74819104da085f5c494de06c327@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/36] usergrid git commit: remove batch consumer remove batch consumer Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76816007 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76816007 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76816007 Branch: refs/heads/master Commit: 76816007f34317e54659aaad3ac5c1bf59e8580d Parents: ce5a96f Author: Shawn Feldman Authored: Tue Sep 15 13:11:12 2015 -0600 Committer: Shawn Feldman Committed: Thu Sep 24 15:00:13 2015 -0600 ---------------------------------------------------------------------- .../usergrid/persistence/index/IndexFig.java | 14 ---- .../index/impl/EsIndexBufferConsumerImpl.java | 87 +++++++++----------- .../index/impl/IndexBufferConsumer.java | 2 + .../index/impl/IndexRefreshCommandImpl.java | 1 + 4 files changed, 44 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index 5997029..db1ef3d 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -153,21 +153,7 @@ public interface IndexFig extends GuicyFig { @Default( "2" ) int getIndexCacheMaxWorkers(); - /** - * The maximum time to wait before the buffer flushes and sends index write requests to Elasticsearch. - * This is used so the application doesn't wait forever for the buffer to reach its size before writing - * data to Elasticsearch. - */ - @Default( "250" ) - @Key( INDEX_BUFFER_TIMEOUT ) - long getIndexBufferTimeout(); - /** - * The maximum buffer size to use before sending index write requests to Elasticsearch. - */ - @Default( "1000" ) - @Key( INDEX_BUFFER_SIZE ) - int getIndexBufferSize(); /** * The number of worker threads used for flushing batches of index write requests http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index 14f2b6f..91fab2f 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -100,12 +100,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { public Observable put( IndexOperationMessage message ) { Preconditions.checkNotNull(message, "Message cannot be null"); - indexSizeCounter.inc( message.getDeIndexRequests().size() ); - indexSizeCounter.inc( message.getIndexRequests().size() ); + indexSizeCounter.inc(message.getDeIndexRequests().size()); + indexSizeCounter.inc(message.getIndexRequests().size()); Timer.Context time = offerTimer.time(); - bufferProducer.send( message ); + bufferProducer.send(message); time.stop(); - return message.observable(); + return message.observable(); } @@ -114,22 +114,20 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { */ private void startSubscription() { - + //buffer on our new thread with a timeout final Observable observable = Observable.create(bufferProducer); - //buffer on our new thread with a timeout - observable.buffer( indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, indexFig.getIndexBufferSize(), - Schedulers.io() ).flatMap( indexOpBuffer -> { + observable.subscribeOn(Schedulers.io()).flatMap(indexOpBuffer -> { //hand off to processor in new observable thread so we can continue to buffer faster - return Observable.just( indexOpBuffer ).flatMap( - indexOpBufferObservable -> ObservableTimer.time( processBatch( indexOpBufferObservable ),flushTimer ) + return Observable.just(indexOpBuffer).flatMap( + indexOpBufferObservable -> ObservableTimer.time(processBatch(indexOpBufferObservable), flushTimer) ) //use the I/O scheduler for thread re-use and efficiency in context switching then use our concurrent // flatmap count or higher throughput of batches once buffered - .subscribeOn( Schedulers.io() ); - }, indexFig.getIndexFlushWorkerCount() ) + .subscribeOn(Schedulers.io()); + }, indexFig.getIndexFlushWorkerCount()) //start in the background .subscribe(); } @@ -137,63 +135,60 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { /** * Process the buffer of batches - * @param batches + * @param batch * @return */ - private Observable processBatch( final List batches ) { + private Observable processBatch( final IndexOperationMessage batch ) { - final Observable indexOps = Observable.from( batches ); - //take our stream of batches, then stream then into individual ops for consumption on ES - final Observable batchOps = indexOps.flatMap( batch -> { - final Set indexOperationSet = batch.getIndexRequests(); - final Set deIndexOperationSet = batch.getDeIndexRequests(); - final int indexOperationSetSize = indexOperationSet.size(); - final int deIndexOperationSetSize = deIndexOperationSet.size(); + final Set indexOperationSet = batch.getIndexRequests(); + final Set deIndexOperationSet = batch.getDeIndexRequests(); + + final int indexOperationSetSize = indexOperationSet.size(); + final int deIndexOperationSetSize = deIndexOperationSet.size(); - log.debug( "Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize ); + log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize); - indexSizeCounter.dec( indexOperationSetSize ); - indexSizeCounter.dec( deIndexOperationSetSize ); + indexSizeCounter.dec(indexOperationSetSize); + indexSizeCounter.dec(deIndexOperationSetSize); - final Observable index = Observable.from( batch.getIndexRequests() ); - final Observable deIndex = Observable.from( batch.getDeIndexRequests() ); + final Observable index = Observable.from(batch.getIndexRequests()); + final Observable deIndex = Observable.from(batch.getDeIndexRequests()); - return Observable.merge( index, deIndex ); - } ); + final Observable batchOps = Observable.merge(index, deIndex); //buffer into the max size we can send ES and fire them all off until we're completed - final Observable requests = batchOps.buffer( indexFig.getIndexBatchSize() ) + final Observable requests = batchOps.buffer(indexFig.getIndexBatchSize()) //flatten the buffer into a single batch execution - .flatMap( individualOps -> Observable.from( individualOps ) + .flatMap(individualOps -> Observable.from(individualOps) //collect them - .collect( () -> initRequest(), ( bulkRequestBuilder, batchOperation ) -> { - log.debug( "adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder ); - batchOperation.doOperation( client, bulkRequestBuilder ); - } ) ) + .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> { + log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder); + batchOperation.doOperation(client, bulkRequestBuilder); + })) //write them - .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) ); + .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder)); //now that we've processed them all, ack the futures after our last batch comes through final Observable processedIndexOperations = - requests.lastOrDefault(null).flatMap( lastRequest ->{ - if(lastRequest!=null){ - return Observable.from( batches ) ; - }else{ + requests.lastOrDefault(null).flatMap(lastRequest -> { + if (lastRequest != null) { + return Observable.just(batch); + } else { return Observable.empty(); } }); //subscribe to the operations that generate requests on a new thread so that we can execute them quickly //mark this as done - return processedIndexOperations.doOnNext( processedIndexOp -> { - processedIndexOp.done(); - roundtripTimer.update( System.currentTimeMillis() - processedIndexOp.getCreationTime() ); - } ); + return processedIndexOperations.doOnNext(processedIndexOp -> { + processedIndexOp.done(); + roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime()); + }); } @@ -266,12 +261,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { /** * Send the data through the buffer */ - public void send( final IndexOperationMessage indexOp ) { + public void send( final IndexOperationMessage indexOps ) { try { - subscriber.onNext( indexOp ); + subscriber.onNext( indexOps ); }catch(Exception e){ //re-throws so the caller can determine failover - log.error( "Unable to process message for indexOp {}, error follows.", indexOp, e ); + log.error( "Unable to process message for indexOp {}, error follows.", indexOps, e ); throw e; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java index e769455..df2119c 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java @@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.index.impl; import rx.Observable; +import java.util.List; + /** * Buffer index requests http://git-wip-us.apache.org/repos/asf/usergrid/blob/76816007/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index 70220d0..7b9bc5d 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.index.impl; +import java.util.Collections; import java.util.Map; import java.util.UUID;