Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E9F4017652 for ; Sat, 7 Mar 2015 01:06:02 +0000 (UTC) Received: (qmail 57537 invoked by uid 500); 7 Mar 2015 01:05:28 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 57479 invoked by uid 500); 7 Mar 2015 01:05:28 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 57184 invoked by uid 99); 7 Mar 2015 01:05:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 07 Mar 2015 01:05:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50C8CE10E5; Sat, 7 Mar 2015 01:05:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: toddnine@apache.org To: commits@usergrid.apache.org Date: Sat, 07 Mar 2015 01:05:40 -0000 Message-Id: <8f14a31948e54382b03bbeb82827a232@git.apache.org> In-Reply-To: <12008f42d97b4b19be92046bc2e67817@git.apache.org> References: <12008f42d97b4b19be92046bc2e67817@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/20] incubator-usergrid git commit: changing buffer impl to queue changing buffer impl to queue Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5ad1a8c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5ad1a8c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5ad1a8c9 Branch: refs/heads/USERGRID-416 Commit: 5ad1a8c953fc3b8a6b48e6d0ca3c7c0da52ec8d1 Parents: 615a5af Author: Shawn Feldman Authored: Fri Mar 6 15:47:28 2015 -0700 Committer: Shawn Feldman Committed: Fri Mar 6 15:47:28 2015 -0700 ---------------------------------------------------------------------- .../persistence/index/IndexBufferProducer.java | 10 +++-- .../usergrid/persistence/index/IndexFig.java | 12 +++++- .../index/impl/EsIndexBufferConsumerImpl.java | 41 +++++++++++++++++++- .../index/impl/EsIndexBufferProducerImpl.java | 30 +++++++++----- .../persistence/index/impl/EntityIndexTest.java | 2 +- 5 files changed, 77 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java index 6338a0c..19d224c 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java @@ -24,13 +24,15 @@ import org.apache.usergrid.persistence.index.IndexOperationMessage; import rx.Observable; import rx.Subscriber; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + /** * Classy class class. */ -public interface IndexBufferProducer extends Observable.OnSubscribe { - - @Override - void call(Subscriber subscriber); +public interface IndexBufferProducer { BetterFuture put(IndexOperationMessage message); + + BlockingQueue getSource(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index 9893ca5..c6f08f6 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -51,6 +51,8 @@ public interface IndexFig extends GuicyFig { public static final String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size"; + public static final String INDEX_QUEUE_SIZE = "elasticsearch.queue_size"; + public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout"; public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size"; @@ -127,7 +129,7 @@ public interface IndexFig extends GuicyFig { */ @Default("250") @Key( INDEX_BUFFER_TIMEOUT ) - int getIndexBufferTimeout(); + long getIndexBufferTimeout(); /** * size of the buffer to build up before you send results @@ -138,6 +140,14 @@ public interface IndexFig extends GuicyFig { int getIndexBufferSize(); /** + * size of the buffer to build up before you send results + * @return + */ + @Default("1000") + @Key( INDEX_QUEUE_SIZE ) + int getIndexQueueSize(); + + /** * Request batch size for ES * @return */ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index 52d1abb..625f4e7 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -42,12 +42,14 @@ import org.elasticsearch.client.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; +import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func1; import rx.schedulers.Schedulers; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * Consumer for IndexOperationMessages @@ -63,6 +65,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { private final Timer flushTimer; private final Counter indexSizeCounter; private final Meter flushMeter; + private final Timer produceTimer; @Inject public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){ @@ -72,14 +75,48 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { this.config = config; this.failureMonitor = new FailureMonitorImpl(config,provider); this.client = provider.getClient(); + this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch"); + final BlockingQueue producerQueue = producer.getSource(); //batch up sets of some size and send them in batch - this.consumer = Observable.create(producer) + this.consumer = Observable.create(new Observable.OnSubscribe() { + @Override + public void call(final Subscriber subscriber) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + List drainList = new ArrayList<>(config.getIndexBufferSize() + 1); + do { + try { + Timer.Context timer = produceTimer.time(); + IndexOperationMessage polled = producerQueue.poll(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS); + if(polled!=null) { + drainList.add(polled); + producerQueue.drainTo(drainList, config.getIndexBufferSize()); + System.out.println("Consumer Thread" + Thread.currentThread().getName()); + for(IndexOperationMessage drained : drainList){ + subscriber.onNext(drained); + } + drainList.clear(); + } + timer.stop(); + + } catch (InterruptedException ie) { + log.error("failed to dequeue", ie); + } + } while (true); + } + }); + thread.setName("EsEntityIndex_Consumer"); + thread.start(); + } + }) .subscribeOn(Schedulers.io()) .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize()) .doOnNext(new Action1>() { @Override public void call(List containerList) { + System.out.println("Buffered Consumer Thread" + Thread.currentThread().getName()); if (containerList.size() > 0) { flushMeter.mark(containerList.size()); Timer.Context time = flushTimer.time(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java index 29f243b..d4d621f 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java @@ -20,15 +20,20 @@ package org.apache.usergrid.persistence.index.impl; import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.usergrid.persistence.core.future.BetterFuture; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.index.IndexBufferProducer; +import org.apache.usergrid.persistence.index.IndexFig; import org.apache.usergrid.persistence.index.IndexOperationMessage; import rx.Subscriber; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + /** * Producer for index operation messages */ @@ -36,22 +41,27 @@ import rx.Subscriber; public class EsIndexBufferProducerImpl implements IndexBufferProducer { private final Counter indexSizeCounter; - private Subscriber subscriber; + private final ArrayBlockingQueue messages; + private final Timer timer; @Inject - public EsIndexBufferProducerImpl(MetricsFactory metricsFactory){ - this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class,"index.buffer.size"); - - } - @Override - public void call(Subscriber subscriber) { - this.subscriber = subscriber; + public EsIndexBufferProducerImpl(MetricsFactory metricsFactory,IndexFig fig){ + this.messages = new ArrayBlockingQueue<>(fig.getIndexQueueSize()*5); + this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size"); + this.timer = metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer"); } public BetterFuture put(IndexOperationMessage message){ - Preconditions.checkNotNull(message,"Message cannot be null"); + Preconditions.checkNotNull(message, "Message cannot be null"); indexSizeCounter.inc(message.getOperations().size()); - subscriber.onNext(message); + Timer.Context time = timer.time(); + messages.offer(message); + time.stop(); return message.getFuture(); } + + @Override + public BlockingQueue getSource() { + return messages; + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5ad1a8c9/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index 22aadc0..70ae8c5 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -96,7 +96,7 @@ public class EntityIndexTest extends BaseIT { long now = System.currentTimeMillis(); final int threads = 20; - final int size = 20; + final int size = 30; final EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); final IndexScope indexScope = new IndexScopeImpl(appId, "things"); final String entityType = "thing";