accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [5/7] accumulo git commit: ACCUMULO-3853 Explicitly track sendQueue size
Date Wed, 27 May 2015 02:12:25 GMT
ACCUMULO-3853 Explicitly track sendQueue size

size() on ConcurrentLinkedQueue is a linear operation
on the number of nodes in the queue which stinks. Keep
an explicit count on the size to avoid this.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/caef59e4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/caef59e4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/caef59e4

Branch: refs/heads/master
Commit: caef59e4a61f23be5de5d609a9bc8f2dba8bde57
Parents: 5af27c2
Author: Josh Elser <josh.elser@gmail.com>
Authored: Tue May 26 18:42:34 2015 -0400
Committer: Josh Elser <josh.elser@gmail.com>
Committed: Tue May 26 21:00:12 2015 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/tracer/AsyncSpanReceiver.java    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/caef59e4/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
index 9b8705a..d3a2fc5 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.tracer;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.AbstractQueue;
@@ -27,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.trace.DistributedTrace;
 import org.apache.accumulo.tracer.thrift.Annotation;
@@ -38,8 +41,6 @@ import org.apache.htrace.TimelineAnnotation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 /**
  * Deliver Span information periodically to a destination.
  * <ul>
@@ -69,6 +70,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements
SpanRece
 
   Timer timer = new Timer("SpanSender", true);
   protected final AbstractQueue<RemoteSpan> sendQueue = new ConcurrentLinkedQueue<RemoteSpan>();
+  protected final AtomicInteger sendQueueSize = new AtomicInteger(0);
   int maxQueueSize = 5000;
   long lastNotificationOfDroppedSpans = 0;
   int minSpanSize = 1;
@@ -112,6 +114,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements
SpanRece
           sendQueue.remove();
           sendQueue.notifyAll();
         }
+        sendQueueSize.decrementAndGet();
         continue;
       }
       SpanKey dest = getSpanKey(s.data);
@@ -130,6 +133,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements
SpanRece
             sendQueue.remove();
             sendQueue.notifyAll();
           }
+          sendQueueSize.decrementAndGet();
           sent = true;
         } catch (Exception ex) {
           log.warn("Got error sending to " + dest + ", refreshing client", ex);
@@ -168,7 +172,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements
SpanRece
     SpanKey dest = getSpanKey(data);
     if (dest != null) {
       List<Annotation> annotations = convertToAnnotations(s.getTimelineAnnotations());
-      if (sendQueue.size() > maxQueueSize) {
+      if (sendQueueSize.get() > maxQueueSize) {
         long now = System.currentTimeMillis();
         if (now - lastNotificationOfDroppedSpans > 60 * 1000) {
           log.warn("Tracing spans are being dropped because there are already " + maxQueueSize
+ " spans queued for delivery.\n"
@@ -179,6 +183,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements
SpanRece
       }
       sendQueue.add(new RemoteSpan(host, service == null ? s.getProcessId() : service, s.getTraceId(),
s.getSpanId(), s.getParentId(), s.getStartTimeMillis(),
           s.getStopTimeMillis(), s.getDescription(), data, annotations));
+      sendQueueSize.incrementAndGet();
     }
   }
 


Mime
View raw message