Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A885210945 for ; Tue, 3 Mar 2015 20:00:57 +0000 (UTC) Received: (qmail 45503 invoked by uid 500); 3 Mar 2015 20:00:41 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 45452 invoked by uid 500); 3 Mar 2015 20:00:41 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 44957 invoked by uid 99); 3 Mar 2015 20:00:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 20:00:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1D789E102F; Tue, 3 Mar 2015 20:00:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: grey@apache.org To: commits@usergrid.apache.org Date: Tue, 03 Mar 2015 20:00:56 -0000 Message-Id: <72cfdc23e1f543d5b5f11dc740cfbfcb@git.apache.org> In-Reply-To: <31613d817b9b4e14abf7adb913ddece6@git.apache.org> References: <31613d817b9b4e14abf7adb913ddece6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/37] incubator-usergrid git commit: removing future, moving around some initialization code removing future, moving around some initialization code Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3231534d Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3231534d Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3231534d Branch: refs/heads/USERGRID-422 Commit: 3231534d75544779074e987bf2184028cb1c5dbd Parents: 3f637da Author: Shawn Feldman Authored: Tue Feb 24 16:02:11 2015 -0700 Committer: Shawn Feldman Committed: Tue Feb 24 16:02:11 2015 -0700 ---------------------------------------------------------------------- .../core/future/ObservableFuture.java | 42 --------------- .../index/impl/EsEntityIndexBatchImpl.java | 54 +++++++++++--------- .../index/impl/IndexBatchBufferImpl.java | 7 +-- 3 files changed, 35 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3231534d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java deleted file mode 100644 index d08dd92..0000000 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.persistence.core.future; - -import rx.Observable; -import rx.Subscriber; - -/** - * - */ -public class ObservableFuture implements Observable.OnSubscribe { - - private Subscriber subscriber; - - @Override - public void call(Subscriber subscriber) { - this.subscriber = subscriber; - } - - public void done(T t){ - this.subscriber.onCompleted(); - } - - public void emit(T t){ - this.subscriber.onNext(t); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3231534d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index 896c038..e008707 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.util.concurrent.Futures; import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.core.rx.ObservableIterator; import org.apache.usergrid.persistence.index.*; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -104,6 +105,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope); this.alias = indexIdentifier.getAlias(); this.refresh = config.isForcedRefresh(); + //constrained this.promises = new ConcurrentLinkedQueue<>(); } @@ -147,10 +149,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { public EntityIndexBatch deindex( final IndexScope indexScope, final Id id, final UUID version) { IndexValidationUtils.validateIndexScope( indexScope ); - ValidationUtils.verifyIdentity( id ); + ValidationUtils.verifyIdentity(id); ValidationUtils.verifyVersion( version ); - final String context = createContextName( indexScope ); + final String context = createContextName(indexScope); final String entityType = id.getType(); final String indexId = createIndexDocId( id, version, context ); @@ -193,7 +195,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { } }).toBlocking().last(); - log.debug( "Deindexed Entity with index id " + indexId ); + log.debug("Deindexed Entity with index id " + indexId); return this; @@ -213,35 +215,41 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { return deindex( indexScope, entity.getId(), entity.getVersion() ); } - - @Override public void execute() { -// indexBatchBuffer.flush(); - Observable.from(promises) - .doOnNext(new Action1() { - @Override - public void call(BetterFuture betterFuture) { - betterFuture.get(); - } - }).toBlocking().lastOrDefault(null); - promises.clear(); + flushFutures(); } - - @Override public void executeAndRefresh() { -// indexBatchBuffer.flushAndRefresh(); - Iterator iterator = promises.iterator(); - while(iterator.hasNext()){ - iterator.next().get(); - } - promises.clear(); + flushFutures(); entityIndex.refresh(); - } + private void flushFutures() { + ObservableIterator iterator = new ObservableIterator("futures") { + @Override + protected Iterator getIterator() { + return promises.iterator(); + } + }; + Observable.create(iterator) + .doOnNext(new Action1() { + @Override + public void call(BetterFuture betterFuture) { + betterFuture.get(); + } + }) + .buffer(100) + .doOnNext(new Action1>() { + @Override + public void call(List betterFutures) { + promises.removeAll(betterFutures); + } + }) + .toBlocking() + .lastOrDefault(null); + } /** * Set the entity as a map with the context http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3231534d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java index d7e6bf1..ad14920 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java @@ -59,6 +59,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer { private final IndexFig config; private final Timer flushTimer; private final ArrayBlockingQueue blockingQueue; + private final Counter bufferCounter; private Observable> consumer; private Producer producer; @@ -72,7 +73,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer { this.failureMonitor = new FailureMonitorImpl(config,provider); this.producer = new Producer(); this.client = provider.getClient(); - + bufferCounter = metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size"); consumer(); } @@ -97,7 +98,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer { @Override public BetterFuture put(IndexRequestBuilder builder){ RequestBuilderContainer container = new RequestBuilderContainer(builder); - metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc(); + bufferCounter.inc(); producer.put(container); return container.getFuture(); } @@ -105,7 +106,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer { @Override public BetterFuture put(DeleteRequestBuilder builder){ RequestBuilderContainer container = new RequestBuilderContainer(builder); - metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc(); + bufferCounter.inc(); producer.put(container); return container.getFuture(); }