usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [38/50] [abbrv] incubator-usergrid git commit: add new histogram for message cycle time
Date Mon, 29 Jun 2015 17:40:50 GMT
add new histogram for message cycle time


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/593d89c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/593d89c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/593d89c5

Branch: refs/heads/USERGRID-703
Commit: 593d89c596db8271c608db50829114bcba94c08d
Parents: ba7ec56
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Tue Jun 23 17:14:23 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Tue Jun 23 17:14:23 2015 -0600

----------------------------------------------------------------------
 .../index/impl/EsIndexBufferConsumerImpl.java        | 15 ++++++++-------
 .../index/impl/IndexOperationMessage.java            |  6 ++++++
 2 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/593d89c5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 0ba1d0b..67d2742 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.codahale.metrics.Histogram;
 import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -65,7 +66,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final Meter flushMeter;
     private final Timer produceTimer;
     private final IndexFig indexFig;
-    private final AtomicLong counter = new AtomicLong();
 
 
     private final Counter indexSizeCounter;
@@ -74,6 +74,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
 
     private final BufferProducer bufferProducer;
+    private final Histogram roundtripTimer;
 
 
     private AtomicLong inFlight = new AtomicLong();
@@ -87,11 +88,11 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
         this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class,
"index_buffer.size");
         this.indexErrorCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class,
"index_buffer.error");
         this.offerTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.producer");
+        this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class,
"index_buffer.message_cycle");
 
         //wire up the gauge of inflight messages
         metricsFactory.addGauge(EsIndexBufferConsumerImpl.class, "index_buffer.inflight",
() -> inFlight.longValue());
 
-
         this.config = config;
         this.failureMonitor = new FailureMonitorImpl(config, provider);
         this.client = provider.getClient();
@@ -168,9 +169,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
 
             final Observable<IndexOperation> index = Observable.from( batch.getIndexRequests()
);
             final Observable<DeIndexOperation> deIndex = Observable.from( batch.getDeIndexRequests()
);
-//            if(indexOperationSetSize +  deIndexOperationSetSize > 0){
-//                batch.done();
-//            }
 
             return Observable.merge( index, deIndex );
         } );
@@ -200,8 +198,11 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
 
         //subscribe to the operations that generate requests on a new thread so that we can
execute them quickly
         //mark this as done
-        return processedIndexOperations.doOnNext( processedIndexOp -> processedIndexOp.done()
-        ).doOnError(t -> log.error("Unable to ack futures", t) );
+        return processedIndexOperations.doOnNext( processedIndexOp -> {
+                processedIndexOp.done();
+                roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
+            }
+        ).doOnError(t -> log.error("Unable to ack futures", t));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/593d89c5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index aaad0eb..0a49626 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -37,6 +37,7 @@ public class IndexOperationMessage implements Serializable {
     private final Set<IndexOperation> indexRequests;
     private final Set<DeIndexOperation> deIndexRequests;
 
+    private long creationTime;
 
 
     private final FutureObservable<IndexOperationMessage> containerFuture;
@@ -46,6 +47,7 @@ public class IndexOperationMessage implements Serializable {
         this.indexRequests = new HashSet<>();
         this.deIndexRequests = new HashSet<>();
         this.containerFuture = new FutureObservable<>( this );
+        this.creationTime = System.currentTimeMillis();
     }
 
 
@@ -118,4 +120,8 @@ public class IndexOperationMessage implements Serializable {
         //if this has been serialized, it could be null. don't NPE if it is, there's nothing
to ack
         containerFuture.done();
     }
+
+    public long getCreationTime() {
+        return creationTime;
+    }
 }


Mime
View raw message