Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5C2521093C for ; Tue, 3 Mar 2015 20:00:42 +0000 (UTC) Received: (qmail 45950 invoked by uid 500); 3 Mar 2015 20:00:42 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 45901 invoked by uid 500); 3 Mar 2015 20:00:42 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 45164 invoked by uid 99); 3 Mar 2015 20:00:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 20:00:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 34F1DE1083; Tue, 3 Mar 2015 20:00:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: grey@apache.org To: commits@usergrid.apache.org Date: Tue, 03 Mar 2015 20:01:08 -0000 Message-Id: <04404cf0c5de4ff89dc3298f087fd412@git.apache.org> In-Reply-To: <31613d817b9b4e14abf7adb913ddece6@git.apache.org> References: <31613d817b9b4e14abf7adb913ddece6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/37] incubator-usergrid git commit: indexbuffer: update method names indexbuffer: update method names Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/05f52513 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/05f52513 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/05f52513 Branch: refs/heads/USERGRID-422 Commit: 05f52513e491e27d9bb0d26374e315aca9b00f8d Parents: 382610d Author: Shawn Feldman Authored: Thu Feb 26 14:28:20 2015 -0700 Committer: Shawn Feldman Committed: Thu Feb 26 14:28:20 2015 -0700 ---------------------------------------------------------------------- .../index/IndexOperationMessage.java | 14 ++++++- .../index/impl/EsEntityIndexBatchImpl.java | 4 +- .../index/impl/EsIndexBufferConsumerImpl.java | 40 ++++++++------------ .../index/impl/EsIndexBufferProducerImpl.java | 4 +- 4 files changed, 32 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java index 3a0a702..501233e 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java @@ -41,12 +41,22 @@ public class IndexOperationMessage { }); } - public void add(ShardReplicationOperationRequestBuilder builder){ + public void addOperation(ShardReplicationOperationRequestBuilder builder){ builders.add(builder); } - public ConcurrentLinkedQueue getBuilder(){ + + /** + * return operations for the message + * @return + */ + public ConcurrentLinkedQueue getOperations(){ return builders; } + + /** + * return the promise + * @return + */ public BetterFuture getFuture(){ return containerFuture; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index 2e0fb56..c2a3fdc 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -127,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { final String entityType = entity.getId().getType(); IndexRequestBuilder builder = client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap ); - container.add(builder); + container.addOperation(builder); return this; } @@ -173,7 +173,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { public Object call(String index) { try { DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh); - container.add(builder); + container.addOperation(builder); }catch (Exception e){ log.error("failed to deindex",e); throw e; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index eaca9bd..09c7097 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -45,7 +45,6 @@ import rx.schedulers.Schedulers; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * Consumer for IndexOperationMessages @@ -69,22 +68,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { this.failureMonitor = new FailureMonitorImpl(config,provider); this.client = provider.getClient(); - final AtomicLong queueSize = new AtomicLong(); //batch up sets of some size and send them in batch this.consumer = Observable.create(producer) .subscribeOn(Schedulers.io()) - .doOnNext(new Action1() { - @Override - public void call(IndexOperationMessage requestBuilderContainer) { - queueSize.addAndGet(requestBuilderContainer.getBuilder().size()); - } - }) .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize()) .doOnNext(new Action1>() { @Override public void call(List containerList) { flushTimer.time(); - if(containerList.size()>0){ + if (containerList.size() > 0) { execute(containerList); } } @@ -107,13 +99,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { .flatMap(new Func1>() { @Override public Observable call(IndexOperationMessage operationMessage) { - return Observable.from(operationMessage.getBuilder()) - .map(new Func1() { - @Override - public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) { - return builder; - } - }); + return Observable.from(operationMessage.getOperations()); } }); @@ -123,17 +109,21 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { .doOnNext(new Action1>() { @Override public void call(List builders) { - final BulkRequestBuilder bulkRequest = initRequest(); - for (ShardReplicationOperationRequestBuilder builder : builders) { - indexSizeCounter.dec(); - if (builder instanceof IndexRequestBuilder) { - bulkRequest.add((IndexRequestBuilder) builder); - } - if (builder instanceof DeleteRequestBuilder) { - bulkRequest.add((DeleteRequestBuilder) builder); + try { + final BulkRequestBuilder bulkRequest = initRequest(); + for (ShardReplicationOperationRequestBuilder builder : builders) { + indexSizeCounter.dec(); + if (builder instanceof IndexRequestBuilder) { + bulkRequest.add((IndexRequestBuilder) builder); + } + if (builder instanceof DeleteRequestBuilder) { + bulkRequest.add((DeleteRequestBuilder) builder); + } } + sendRequest(bulkRequest); + }catch (Exception e){ + log.error("Failed while sending bulk",e); } - sendRequest(bulkRequest); } }) .toBlocking().lastOrDefault(null); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05f52513/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java index 791cea8..29f243b 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.index.impl; import com.codahale.metrics.Counter; +import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.usergrid.persistence.core.future.BetterFuture; @@ -48,7 +49,8 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer { } public BetterFuture put(IndexOperationMessage message){ - indexSizeCounter.inc(message.getBuilder().size()); + Preconditions.checkNotNull(message,"Message cannot be null"); + indexSizeCounter.inc(message.getOperations().size()); subscriber.onNext(message); return message.getFuture(); }