Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 73AFF1754D for ; Tue, 3 Mar 2015 00:21:50 +0000 (UTC) Received: (qmail 5022 invoked by uid 500); 3 Mar 2015 00:21:50 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 4959 invoked by uid 500); 3 Mar 2015 00:21:50 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 4448 invoked by uid 99); 3 Mar 2015 00:21:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 00:21:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E0740E104E; Tue, 3 Mar 2015 00:21:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: grey@apache.org To: commits@usergrid.apache.org Date: Tue, 03 Mar 2015 00:22:07 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/27] incubator-usergrid git commit: index message comments index message comments separate into classes switch to producer consumer model removing static class Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/489b2d78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/489b2d78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/489b2d78 Branch: refs/heads/USERGRID-280 Commit: 489b2d78da3167f63d447d5eb8b5048e744b51aa Parents: 0621294 Author: Shawn Feldman Authored: Wed Feb 25 18:42:29 2015 -0700 Committer: Shawn Feldman Committed: Thu Feb 26 09:42:24 2015 -0700 ---------------------------------------------------------------------- .../persistence/index/IndexBufferConsumer.java | 26 +++ .../persistence/index/IndexBufferProducer.java | 36 ++++ .../index/IndexOperationMessage.java | 5 +- .../persistence/index/guice/IndexModule.java | 11 +- .../index/impl/EsEntityIndexBatchImpl.java | 8 +- .../index/impl/EsEntityIndexImpl.java | 10 +- .../index/impl/EsIndexBufferConsumerImpl.java | 193 +++++++++++++++++ .../index/impl/EsIndexBufferProducerImpl.java | 55 +++++ .../index/impl/IndexBatchBufferImpl.java | 206 ------------------- 9 files changed, 325 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java new file mode 100644 index 0000000..ac7489c --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java @@ -0,0 +1,26 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.persistence.index; + +/** + * Classy class class. + */ +public interface IndexBufferConsumer { +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java new file mode 100644 index 0000000..6338a0c --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java @@ -0,0 +1,36 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.persistence.index; + +import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.index.IndexOperationMessage; +import rx.Observable; +import rx.Subscriber; + +/** + * Classy class class. + */ +public interface IndexBufferProducer extends Observable.OnSubscribe { + + @Override + void call(Subscriber subscriber); + + BetterFuture put(IndexOperationMessage message); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java index 5acb17e..3a0a702 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java @@ -24,7 +24,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; /** - * Classy class class. + * Container for index operations. */ public class IndexOperationMessage { private final ConcurrentLinkedQueue builders; @@ -47,9 +47,6 @@ public class IndexOperationMessage { public ConcurrentLinkedQueue getBuilder(){ return builders; } - public void done(){ - containerFuture.done(); - } public BetterFuture getFuture(){ return containerFuture; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java index 5af148a..d9a14c9 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java @@ -19,14 +19,12 @@ package org.apache.usergrid.persistence.index.guice; -import org.apache.usergrid.persistence.index.IndexBatchBuffer; -import org.apache.usergrid.persistence.index.IndexFig; +import org.apache.usergrid.persistence.index.*; import com.google.inject.AbstractModule; import com.google.inject.assistedinject.FactoryModuleBuilder; -import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl; -import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl; +import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl; +import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl; import org.safehaus.guicyfig.GuicyFigModule; @@ -42,7 +40,8 @@ public class IndexModule extends AbstractModule { .implement(EntityIndex.class, EsEntityIndexImpl.class) .build(EntityIndexFactory.class)); - bind(IndexBatchBuffer.class).to(IndexBatchBufferImpl.class); + bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class); + bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index 455dba6..2e0fb56 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -76,18 +76,18 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { private final IndexIdentifier.IndexAlias alias; private final IndexIdentifier indexIdentifier; - private final IndexBatchBuffer indexBatchBuffer; + private final IndexBufferProducer indexBatchBufferProducer; private final AliasedEntityIndex entityIndex; private IndexOperationMessage container; - public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer, + public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBufferProducer indexBatchBufferProducer, final IndexFig config, final AliasedEntityIndex entityIndex ) { this.applicationScope = applicationScope; this.client = client; - this.indexBatchBuffer = indexBatchBuffer; + this.indexBatchBufferProducer = indexBatchBufferProducer; this.entityIndex = entityIndex; this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope); this.alias = indexIdentifier.getAlias(); @@ -204,7 +204,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { public BetterFuture execute() { IndexOperationMessage tempContainer = container; container = new IndexOperationMessage(); - return indexBatchBuffer.put(tempContainer); + return indexBatchBufferProducer.put(tempContainer); } /** http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 693b168..ad638c6 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -87,7 +87,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex { private final IndexIdentifier.IndexAlias alias; private final IndexIdentifier indexIdentifier; - private final IndexBatchBuffer indexBatchBuffer; + private final IndexBufferProducer indexBatchBufferProducer; /** * We purposefully make this per instance. Some indexes may work, while others may fail @@ -119,8 +119,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex { @Inject - public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBatchBuffer indexBatchBuffer, final EsProvider provider, final EsIndexCache indexCache) { - this.indexBatchBuffer = indexBatchBuffer; + public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, final EsIndexCache indexCache) { + this.indexBatchBufferProducer = indexBatchBufferProducer; ValidationUtils.validateApplicationScope( appScope ); this.applicationScope = appScope; this.esProvider = provider; @@ -282,7 +282,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex { @Override public EntityIndexBatch createBatch() { EntityIndexBatch batch = new EsEntityIndexBatchImpl( - applicationScope, esProvider.getClient(),indexBatchBuffer, config, this ); + applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this ); return batch; } @@ -434,7 +434,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex { public void refresh() { - BetterFuture future = indexBatchBuffer.put(new IndexOperationMessage()); + BetterFuture future = indexBatchBufferProducer.put(new IndexOperationMessage()); future.get(); //loop through all batches and retrieve promises and call get http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java new file mode 100644 index 0000000..eaca9bd --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -0,0 +1,193 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.persistence.index.impl; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.index.IndexBufferConsumer; +import org.apache.usergrid.persistence.index.IndexBufferProducer; +import org.apache.usergrid.persistence.index.IndexFig; +import org.apache.usergrid.persistence.index.IndexOperationMessage; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequestBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder; +import org.elasticsearch.client.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.schedulers.Schedulers; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Consumer for IndexOperationMessages + */ +@Singleton +public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { + private static final Logger log = LoggerFactory.getLogger(EsIndexBufferConsumerImpl.class); + + private final IndexFig config; + private final FailureMonitorImpl failureMonitor; + private final Client client; + private final Observable> consumer; + private final Timer flushTimer; + private final Counter indexSizeCounter; + + @Inject + public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){ + this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush"); + this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size"); + this.config = config; + this.failureMonitor = new FailureMonitorImpl(config,provider); + this.client = provider.getClient(); + + final AtomicLong queueSize = new AtomicLong(); + //batch up sets of some size and send them in batch + this.consumer = Observable.create(producer) + .subscribeOn(Schedulers.io()) + .doOnNext(new Action1() { + @Override + public void call(IndexOperationMessage requestBuilderContainer) { + queueSize.addAndGet(requestBuilderContainer.getBuilder().size()); + } + }) + .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize()) + .doOnNext(new Action1>() { + @Override + public void call(List containerList) { + flushTimer.time(); + if(containerList.size()>0){ + execute(containerList); + } + } + }); + consumer.subscribe(); + } + + /** + * Execute the request, check for errors, then re-init the batch for future use + */ + private void execute(final List operationMessages) { + + if (operationMessages == null || operationMessages.size() == 0) { + return; + } + + //process and flatten all the messages to builder requests + Observable flattenMessages = Observable.from(operationMessages) + .subscribeOn(Schedulers.io()) + .flatMap(new Func1>() { + @Override + public Observable call(IndexOperationMessage operationMessage) { + return Observable.from(operationMessage.getBuilder()) + .map(new Func1() { + @Override + public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) { + return builder; + } + }); + } + }); + + //batch shard operations into a bulk request + flattenMessages + .buffer(config.getIndexBatchSize()) + .doOnNext(new Action1>() { + @Override + public void call(List builders) { + final BulkRequestBuilder bulkRequest = initRequest(); + for (ShardReplicationOperationRequestBuilder builder : builders) { + indexSizeCounter.dec(); + if (builder instanceof IndexRequestBuilder) { + bulkRequest.add((IndexRequestBuilder) builder); + } + if (builder instanceof DeleteRequestBuilder) { + bulkRequest.add((DeleteRequestBuilder) builder); + } + } + sendRequest(bulkRequest); + } + }) + .toBlocking().lastOrDefault(null); + + //call back all futures + Observable.from(operationMessages) + .subscribeOn(Schedulers.io()) + .doOnNext(new Action1() { + @Override + public void call(IndexOperationMessage operationMessage) { + operationMessage.getFuture().done(); + } + }) + .toBlocking().lastOrDefault(null); + } + + /** + * initialize request + * @return + */ + private BulkRequestBuilder initRequest() { + BulkRequestBuilder bulkRequest = client.prepareBulk(); + bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel())); + bulkRequest.setRefresh(config.isForcedRefresh()); + return bulkRequest; + } + + /** + * send bulk request + * @param bulkRequest + */ + private void sendRequest(BulkRequestBuilder bulkRequest) { + //nothing to do, we haven't added anthing to the index + if (bulkRequest.numberOfActions() == 0) { + return; + } + + final BulkResponse responses; + + try { + responses = bulkRequest.execute().actionGet(); + } catch (Throwable t) { + log.error("Unable to communicate with elasticsearch"); + failureMonitor.fail("Unable to execute batch", t); + throw t; + } + + failureMonitor.success(); + + for (BulkItemResponse response : responses) { + if (response.isFailed()) { + throw new RuntimeException("Unable to index documents. Errors are :" + + response.getFailure().getMessage()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java new file mode 100644 index 0000000..791cea8 --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java @@ -0,0 +1,55 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.persistence.index.impl; + +import com.codahale.metrics.Counter; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.index.IndexBufferProducer; +import org.apache.usergrid.persistence.index.IndexOperationMessage; +import rx.Subscriber; + +/** + * Producer for index operation messages + */ +@Singleton +public class EsIndexBufferProducerImpl implements IndexBufferProducer { + + private final Counter indexSizeCounter; + private Subscriber subscriber; + + @Inject + public EsIndexBufferProducerImpl(MetricsFactory metricsFactory){ + this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class,"index.buffer.size"); + + } + @Override + public void call(Subscriber subscriber) { + this.subscriber = subscriber; + } + + public BetterFuture put(IndexOperationMessage message){ + indexSizeCounter.inc(message.getBuilder().size()); + subscriber.onNext(message); + return message.getFuture(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/489b2d78/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java deleted file mode 100644 index 92ab582..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.persistence.index.impl; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Timer; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import org.apache.usergrid.persistence.core.future.BetterFuture; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.index.IndexBatchBuffer; -import org.apache.usergrid.persistence.index.IndexFig; -import org.apache.usergrid.persistence.index.IndexOperationMessage; -import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequestBuilder; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder; -import org.elasticsearch.client.Client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.Subscriber; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.schedulers.Schedulers; - -import java.util.List; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - - -/** - * Buffer index requests into sets to send. - */ -@Singleton -public class IndexBatchBufferImpl implements IndexBatchBuffer { - - private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class); - private final Counter indexSizeCounter; - private final Client client; - private final FailureMonitorImpl failureMonitor; - private final IndexFig config; - private final Timer flushTimer; - private final Counter bufferCounter; - private Observable> consumer; - private Producer producer; - - @Inject - public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){ - this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush"); - this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size"); - this.config = config; - this.failureMonitor = new FailureMonitorImpl(config,provider); - this.producer = new Producer(); - this.client = provider.getClient(); - bufferCounter = metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size"); - consumer(); - } - - private void consumer() { - final AtomicLong queueSize = new AtomicLong(); - //batch up sets of some size and send them in batch - this.consumer = Observable.create(producer) - .subscribeOn(Schedulers.io()) - .doOnNext(new Action1() { - @Override - public void call(IndexOperationMessage requestBuilderContainer) { - queueSize.addAndGet(requestBuilderContainer.getBuilder().size()); - } - }) - .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize()) - .doOnNext(new Action1>() { - @Override - public void call(List containerList) { - flushTimer.time(); - indexSizeCounter.dec(containerList.size()); - if(containerList.size()>0){ - execute(containerList); - } - } - }); - consumer.subscribe(); - } - - @Override - public BetterFuture put(IndexOperationMessage container){ - bufferCounter.inc(); - producer.put(container); - return container.getFuture(); - } - - - /** - * Execute the request, check for errors, then re-init the batch for future use - */ - private void execute(final List containers) { - - if (containers == null || containers.size() == 0) { - return; - } - - final AtomicBoolean isForceRefresh = new AtomicBoolean(config.isForcedRefresh()); - //clear the queue or proceed to buffer size - Observable.from(containers) - .subscribeOn(Schedulers.io()) - .flatMap(new Func1>() { - @Override - public Observable call(IndexOperationMessage requestBuilderContainer) { - return Observable.from(requestBuilderContainer.getBuilder()) - .map(new Func1() { - @Override - public ShardReplicationOperationRequestBuilder call(ShardReplicationOperationRequestBuilder builder) { - return builder; - } - }); - } - }) - .buffer(config.getIndexBatchSize()) - .doOnNext(new Action1>() { - @Override - public void call(List builders) { - final BulkRequestBuilder bulkRequest = initRequest(isForceRefresh.get()); - for (ShardReplicationOperationRequestBuilder builder : builders) { - if (builder instanceof IndexRequestBuilder) { - bulkRequest.add((IndexRequestBuilder) builder); - } - if (builder instanceof DeleteRequestBuilder) { - bulkRequest.add((DeleteRequestBuilder) builder); - } - } - sendRequest(bulkRequest); - } - }).toBlocking().lastOrDefault(null); - - for (IndexOperationMessage container : containers) { - container.done(); - } - } - - private BulkRequestBuilder initRequest(boolean refresh) { - BulkRequestBuilder bulkRequest = client.prepareBulk(); - bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel())); - bulkRequest.setRefresh(refresh); - return bulkRequest; - } - - private void sendRequest(BulkRequestBuilder bulkRequest) { - //nothing to do, we haven't added anthing to the index - if (bulkRequest.numberOfActions() == 0) { - return; - } - - final BulkResponse responses; - - try { - responses = bulkRequest.execute().actionGet(); - } catch (Throwable t) { - log.error("Unable to communicate with elasticsearch"); - failureMonitor.fail("Unable to execute batch", t); - throw t; - } - - failureMonitor.success(); - - for (BulkItemResponse response : responses) { - if (response.isFailed()) { - throw new RuntimeException("Unable to index documents. Errors are :" - + response.getFailure().getMessage()); - } - } - } - - - private static class Producer implements Observable.OnSubscribe { - - private Subscriber subscriber; - - @Override - public void call(Subscriber subscriber) { - this.subscriber = subscriber; - } - - public void put(IndexOperationMessage r){ - subscriber.onNext(r); - } - } - -}