Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D5B8417E14 for ; Thu, 1 Oct 2015 16:34:18 +0000 (UTC) Received: (qmail 62555 invoked by uid 500); 1 Oct 2015 16:27:37 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 62497 invoked by uid 500); 1 Oct 2015 16:27:37 -0000 Mailing-List: contact commits-help@usergrid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.apache.org Delivered-To: mailing list commits@usergrid.apache.org Received: (qmail 62078 invoked by uid 99); 1 Oct 2015 16:27:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Oct 2015 16:27:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EC1D1E154D; Thu, 1 Oct 2015 16:27:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sfeldman@apache.org To: commits@usergrid.apache.org Date: Thu, 01 Oct 2015 16:27:44 -0000 Message-Id: <4234be4960144947bd71bed4ad932fd7@git.apache.org> In-Reply-To: <74f2f74819104da085f5c494de06c327@git.apache.org> References: <74f2f74819104da085f5c494de06c327@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/36] usergrid git commit: change consumer to producer change consumer to producer Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5218cda9 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5218cda9 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5218cda9 Branch: refs/heads/master Commit: 5218cda9d5732ae4ee2ee2ce1e7ae297975a7fad Parents: a5ece51 Author: Shawn Feldman Authored: Tue Sep 15 16:17:13 2015 -0600 Committer: Shawn Feldman Committed: Thu Sep 24 15:01:20 2015 -0600 ---------------------------------------------------------------------- .../persistence/index/guice/IndexModule.java | 2 +- .../index/impl/EsEntityIndexBatchImpl.java | 4 +- .../index/impl/EsEntityIndexFactoryImpl.java | 8 +- .../index/impl/EsEntityIndexImpl.java | 6 +- .../index/impl/EsIndexBufferConsumerImpl.java | 216 ------------------- .../index/impl/EsIndexProducerImpl.java | 209 ++++++++++++++++++ .../index/impl/IndexBufferConsumer.java | 40 ---- .../persistence/index/impl/IndexProducer.java | 37 ++++ .../index/impl/IndexRefreshCommandImpl.java | 5 +- 9 files changed, 257 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java index 7279174..46559ad 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java @@ -52,7 +52,7 @@ public abstract class IndexModule extends AbstractModule { bind(IndexCache.class).to(EsIndexCacheImpl.class); bind(IndexRefreshCommand.class).to(IndexRefreshCommandImpl.class); - bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton(); + bind(IndexProducer.class).to(EsIndexProducerImpl.class).asEagerSingleton(); //wire up the edg migration. A no-op ATM, but retained for future development http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index c11feed..64a1c6a 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -41,7 +41,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { private final IndexAlias alias; private final IndexLocationStrategy indexLocationStrategy; - private final IndexBufferConsumer indexBatchBufferProducer; + private final IndexProducer indexBatchBufferProducer; private final EntityIndex entityIndex; private final ApplicationScope applicationScope; @@ -49,7 +49,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { public EsEntityIndexBatchImpl( final IndexLocationStrategy locationStrategy, - final IndexBufferConsumer indexBatchBufferProducer, + final IndexProducer indexBatchBufferProducer, final EntityIndex entityIndex ) { this.indexLocationStrategy = locationStrategy; http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java index 869d079..b66fd40 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java @@ -37,7 +37,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{ private final IndexFig config; private final IndexCache indexCache; private final EsProvider provider; - private final IndexBufferConsumer indexBufferConsumer; + private final IndexProducer indexProducer; private final MetricsFactory metricsFactory; private final IndexRefreshCommand refreshCommand; @@ -50,7 +50,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{ config, refreshCommand, metricsFactory, - indexBufferConsumer, + indexProducer, locationStrategy ); index.initialize(); @@ -62,7 +62,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{ public EsEntityIndexFactoryImpl( final IndexFig indexFig, final IndexCache indexCache, final EsProvider provider, - final IndexBufferConsumer indexBufferConsumer, + final IndexProducer indexProducer, final MetricsFactory metricsFactory, final IndexRefreshCommand refreshCommand @@ -70,7 +70,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{ this.config = indexFig; this.indexCache = indexCache; this.provider = provider; - this.indexBufferConsumer = indexBufferConsumer; + this.indexProducer = indexProducer; this.metricsFactory = metricsFactory; this.refreshCommand = refreshCommand; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 1c63a7b..6317a69 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -63,8 +63,6 @@ import org.elasticsearch.index.query.*; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.aggregations.Aggregation; -import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; import org.slf4j.Logger; @@ -118,7 +116,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2; private final int cursorTimeout; private final long queryTimeout; - private final IndexBufferConsumer indexBatchBufferProducer; + private final IndexProducer indexBatchBufferProducer; private final FailureMonitorImpl failureMonitor; private final Timer aggregationTimer; @@ -133,7 +131,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { final IndexFig indexFig, final IndexRefreshCommand indexRefreshCommand, final MetricsFactory metricsFactory, - final IndexBufferConsumer indexBatchBufferProducer, + final IndexProducer indexBatchBufferProducer, final IndexLocationStrategy indexLocationStrategy ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java deleted file mode 100644 index d126b5d..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.index.impl; - - -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import com.codahale.metrics.Histogram; -import org.apache.usergrid.persistence.core.metrics.ObservableTimer; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.client.Client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.index.IndexFig; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import rx.Observable; -import rx.Subscriber; -import rx.schedulers.Schedulers; - - -/** - * Consumer for IndexOperationMessages - */ -@Singleton -public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { - private static final Logger log = LoggerFactory.getLogger( EsIndexBufferConsumerImpl.class ); - - private final IndexFig config; - private final FailureMonitorImpl failureMonitor; - private final Client client; - private final Timer flushTimer; - private final IndexFig indexFig; - private final Counter indexSizeCounter; - private final Histogram roundtripTimer; - private final Timer indexTimer; - - - private AtomicLong inFlight = new AtomicLong(); - - - @Inject - public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, - final MetricsFactory metricsFactory, final IndexFig indexFig ) { - this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.flush"); - this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index_buffer.size"); - this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class, "index_buffer.message_cycle"); - - //wire up the gauge of inflight messages - metricsFactory.addGauge(EsIndexBufferConsumerImpl.class, "index_buffer.inflight", () -> inFlight.longValue()); - - - this.indexTimer = metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "index" ); - - this.config = config; - this.failureMonitor = new FailureMonitorImpl(config, provider); - this.client = provider.getClient(); - this.indexFig = indexFig; - - - //batch up sets of some size and send them in batch - - } - - public Observable put( IndexOperationMessage message ) { - Preconditions.checkNotNull(message, "Message cannot be null"); - indexSizeCounter.inc(message.getDeIndexRequests().size()); - indexSizeCounter.inc(message.getIndexRequests().size()); - return processBatch(message); - } - - - /** - * Process the buffer of batches - * @param batch - * @return - */ - private Observable processBatch( final IndexOperationMessage batch ) { - - //take our stream of batches, then stream then into individual ops for consumption on ES - final Set indexOperationSet = batch.getIndexRequests(); - final Set deIndexOperationSet = batch.getDeIndexRequests(); - - final int indexOperationSetSize = indexOperationSet.size(); - final int deIndexOperationSetSize = deIndexOperationSet.size(); - - log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize); - - indexSizeCounter.dec(indexOperationSetSize); - indexSizeCounter.dec(deIndexOperationSetSize); - - final Observable index = Observable.from(batch.getIndexRequests()); - final Observable deIndex = Observable.from(batch.getDeIndexRequests()); - - final Observable batchOps = Observable.merge(index, deIndex); - - //buffer into the max size we can send ES and fire them all off until we're completed - final Observable requests = batchOps.buffer(indexFig.getIndexBatchSize()) - //flatten the buffer into a single batch execution - .flatMap(individualOps -> Observable.from(individualOps) - //collect them - .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> { - log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder); - batchOperation.doOperation(client, bulkRequestBuilder); - })) - //write them - .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder)); - - - //now that we've processed them all, ack the futures after our last batch comes through - final Observable processedIndexOperations = - requests.lastOrDefault(null).flatMap(lastRequest -> { - if (lastRequest != null) { - return Observable.just(batch); - } else { - return Observable.empty(); - } - }); - - //subscribe to the operations that generate requests on a new thread so that we can execute them quickly - //mark this as done - return processedIndexOperations.doOnNext(processedIndexOp -> { - roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime()); - }); - } - - - /* - - /** - * initialize request - */ - private BulkRequestBuilder initRequest() { - BulkRequestBuilder bulkRequest = client.prepareBulk(); - bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) ); - bulkRequest.setRefresh( config.isForcedRefresh() ); - return bulkRequest; - } - - - /** - * send bulk request - */ - private void sendRequest( BulkRequestBuilder bulkRequest ) { - //nothing to do, we haven't added anything to the index - if ( bulkRequest.numberOfActions() == 0 ) { - return; - } - - final BulkResponse responses; - - - final Timer.Context timer = indexTimer.time(); - - try { - responses = bulkRequest.execute().actionGet( ); - } catch ( Throwable t ) { - log.error( "Unable to communicate with elasticsearch" ); - failureMonitor.fail( "Unable to execute batch", t ); - throw t; - }finally{ - timer.stop(); - } - - failureMonitor.success(); - - boolean error = false; - - for ( BulkItemResponse response : responses ) { - - if ( response.isFailed() ) { - // log error and continue processing - log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(), - response.getType(), response.getIndex(), response.getFailureMessage() ); - - error = true; - } - } - - if ( error ) { - throw new RuntimeException( - "Error during processing of bulk index operations one of the responses failed. Check previous log " - + "entries" ); - } - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java new file mode 100644 index 0000000..a2c8663 --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.index.impl; + + +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import com.codahale.metrics.Histogram; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.index.IndexFig; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; + + +/** + * Consumer for IndexOperationMessages + */ +@Singleton +public class EsIndexProducerImpl implements IndexProducer { + private static final Logger log = LoggerFactory.getLogger( EsIndexProducerImpl.class ); + + private final IndexFig config; + private final FailureMonitorImpl failureMonitor; + private final Client client; + private final Timer flushTimer; + private final IndexFig indexFig; + private final Counter indexSizeCounter; + private final Histogram roundtripTimer; + private final Timer indexTimer; + + + private AtomicLong inFlight = new AtomicLong(); + + + @Inject + public EsIndexProducerImpl(final IndexFig config, final EsProvider provider, + final MetricsFactory metricsFactory, final IndexFig indexFig) { + this.flushTimer = metricsFactory.getTimer(EsIndexProducerImpl.class, "index_buffer.flush"); + this.indexSizeCounter = metricsFactory.getCounter(EsIndexProducerImpl.class, "index_buffer.size"); + this.roundtripTimer = metricsFactory.getHistogram(EsIndexProducerImpl.class, "index_buffer.message_cycle"); + + //wire up the gauge of inflight messages + metricsFactory.addGauge(EsIndexProducerImpl.class, "index_buffer.inflight", () -> inFlight.longValue()); + + + this.indexTimer = metricsFactory.getTimer( EsIndexProducerImpl.class, "index" ); + + this.config = config; + this.failureMonitor = new FailureMonitorImpl(config, provider); + this.client = provider.getClient(); + this.indexFig = indexFig; + + + //batch up sets of some size and send them in batch + + } + + public Observable put( IndexOperationMessage message ) { + Preconditions.checkNotNull(message, "Message cannot be null"); + indexSizeCounter.inc(message.getDeIndexRequests().size()); + indexSizeCounter.inc(message.getIndexRequests().size()); + return processBatch(message); + } + + + /** + * Process the buffer of batches + * @param batch + * @return + */ + private Observable processBatch( final IndexOperationMessage batch ) { + + //take our stream of batches, then stream then into individual ops for consumption on ES + final Set indexOperationSet = batch.getIndexRequests(); + final Set deIndexOperationSet = batch.getDeIndexRequests(); + + final int indexOperationSetSize = indexOperationSet.size(); + final int deIndexOperationSetSize = deIndexOperationSet.size(); + + log.debug("Emitting {} add and {} remove operations", indexOperationSetSize, deIndexOperationSetSize); + + indexSizeCounter.dec(indexOperationSetSize); + indexSizeCounter.dec(deIndexOperationSetSize); + + final Observable index = Observable.from(batch.getIndexRequests()); + final Observable deIndex = Observable.from(batch.getDeIndexRequests()); + + final Observable batchOps = Observable.merge(index, deIndex); + + //buffer into the max size we can send ES and fire them all off until we're completed + final Observable requests = batchOps.buffer(indexFig.getIndexBatchSize()) + //flatten the buffer into a single batch execution + .flatMap(individualOps -> Observable.from(individualOps) + //collect them + .collect(() -> initRequest(), (bulkRequestBuilder, batchOperation) -> { + log.debug("adding operation {} to bulkRequestBuilder {}", batchOperation, bulkRequestBuilder); + batchOperation.doOperation(client, bulkRequestBuilder); + })) + //write them + .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder)); + + + //now that we've processed them all, ack the futures after our last batch comes through + final Observable processedIndexOperations = + requests.lastOrDefault(null).flatMap(lastRequest -> { + if (lastRequest != null) { + return Observable.just(batch); + } else { + return Observable.empty(); + } + }); + + //subscribe to the operations that generate requests on a new thread so that we can execute them quickly + //mark this as done + return processedIndexOperations.doOnNext(processedIndexOp -> { + roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime()); + }); + } + + + /* + + /** + * initialize request + */ + private BulkRequestBuilder initRequest() { + BulkRequestBuilder bulkRequest = client.prepareBulk(); + bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) ); + bulkRequest.setRefresh( config.isForcedRefresh() ); + return bulkRequest; + } + + + /** + * send bulk request + */ + private void sendRequest( BulkRequestBuilder bulkRequest ) { + //nothing to do, we haven't added anything to the index + if ( bulkRequest.numberOfActions() == 0 ) { + return; + } + + final BulkResponse responses; + + + final Timer.Context timer = indexTimer.time(); + + try { + responses = bulkRequest.execute().actionGet( ); + } catch ( Throwable t ) { + log.error( "Unable to communicate with elasticsearch" ); + failureMonitor.fail( "Unable to execute batch", t ); + throw t; + }finally{ + timer.stop(); + } + + failureMonitor.success(); + + boolean error = false; + + for ( BulkItemResponse response : responses ) { + + if ( response.isFailed() ) { + // log error and continue processing + log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(), + response.getType(), response.getIndex(), response.getFailureMessage() ); + + error = true; + } + } + + if ( error ) { + throw new RuntimeException( + "Error during processing of bulk index operations one of the responses failed. Check previous log " + + "entries" ); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java deleted file mode 100644 index cfeb505..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.index.impl; - - -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import rx.Observable; - -import java.util.List; - - -/** - * Buffer index requests - */ -public interface IndexBufferConsumer { - - - /** - * Put this operation into our collapsing bufer - * @param message - * @return - */ - Observable put(IndexOperationMessage message); - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java new file mode 100644 index 0000000..ba7027e --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexProducer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.index.impl; + + +import rx.Observable; + + +/** + * Buffer index requests + */ +public interface IndexProducer { + + + /** + * Put this operation into our collapsing bufer + * @param message + * @return + */ + Observable put(IndexOperationMessage message); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/5218cda9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index 7b9bc5d..01942a8 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.index.impl; -import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -58,7 +57,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { private final IndexCache indexCache; private final EsProvider esProvider; - private final IndexBufferConsumer producer; + private final IndexProducer producer; private final IndexFig indexFig; private final Timer timer; @@ -66,7 +65,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { @Inject public IndexRefreshCommandImpl( final EsProvider esProvider, - final IndexBufferConsumer producer, + final IndexProducer producer, final IndexFig indexFig, final MetricsFactory metricsFactory, final IndexCache indexCache ) {