accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bil...@apache.org
Subject accumulo git commit: ACCUMULO-3862 improved how AsyncSpanReceiver drops short spans, added test for min span length
Date Fri, 29 May 2015 20:40:31 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.7 61bfbb22e -> 0eef354c5


ACCUMULO-3862 improved how AsyncSpanReceiver drops short spans, added test for min span length


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0eef354c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0eef354c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0eef354c

Branch: refs/heads/1.7
Commit: 0eef354c56e9ef2788c72f4dbaac596653e25bbe
Parents: 61bfbb2
Author: Billie Rinaldi <billie@apache.org>
Authored: Fri May 29 13:29:32 2015 -0700
Committer: Billie Rinaldi <billie@apache.org>
Committed: Fri May 29 13:29:40 2015 -0700

----------------------------------------------------------------------
 .../accumulo/tracer/AsyncSpanReceiver.java      |  14 +-
 .../accumulo/tracer/AsyncSpanReceiverTest.java  | 129 +++++++++++++++++++
 2 files changed, 134 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0eef354c/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
index 28a9088..a35734d 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
@@ -55,7 +55,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements
SpanRece
 
   public static final String SEND_TIMER_MILLIS = "tracer.send.timer.millis";
   public static final String QUEUE_SIZE = "tracer.queue.size";
-  private static final String SPAN_MIN_MS = "tracer.span.min.ms";
+  public static final String SPAN_MIN_MS = "tracer.span.min.ms";
 
   private final Map<SpanKey,Destination> clients = new HashMap<SpanKey,Destination>();
 
@@ -109,14 +109,6 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements
SpanRece
     while (!sendQueue.isEmpty()) {
       boolean sent = false;
       RemoteSpan s = sendQueue.peek();
-      if (s.stop - s.start < minSpanSize) {
-        synchronized (sendQueue) {
-          sendQueue.remove();
-          sendQueue.notifyAll();
-          sendQueueSize.decrementAndGet();
-        }
-        continue;
-      }
       SpanKey dest = getSpanKey(s.data);
       Destination client = clients.get(dest);
       if (client == null) {
@@ -167,6 +159,10 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements
SpanRece
 
   @Override
   public void receiveSpan(Span s) {
+    if (s.getStopTimeMillis() - s.getStartTimeMillis() < minSpanSize) {
+      return;
+    }
+
     Map<String,String> data = convertToStrings(s.getKVAnnotations());
 
     SpanKey dest = getSpanKey(data);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0eef354c/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java
----------------------------------------------------------------------
diff --git a/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java
b/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java
new file mode 100644
index 0000000..6744efc
--- /dev/null
+++ b/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tracer;
+
+import org.apache.accumulo.tracer.thrift.RemoteSpan;
+import org.apache.htrace.HTraceConfiguration;
+import org.apache.htrace.Span;
+import org.apache.htrace.impl.MilliSpan;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+public class AsyncSpanReceiverTest {
+  static class TestReceiver extends AsyncSpanReceiver<String,String> {
+    AtomicInteger spansSent = new AtomicInteger(0);
+
+    TestReceiver() {
+      super(HTraceConfiguration.EMPTY);
+    }
+
+    TestReceiver(HTraceConfiguration conf) {
+      super(conf);
+    }
+
+    @Override
+    protected String createDestination(String o) throws Exception {
+      return "DEST";
+    }
+
+    @Override
+    protected void send(String resource, RemoteSpan span) throws Exception {
+      spansSent.incrementAndGet();
+    }
+
+    @Override
+    protected String getSpanKey(Map data) {
+      return "DEST";
+    }
+
+    int getSpansSent() {
+      return spansSent.get();
+    }
+
+    int getQueueSize() {
+      return sendQueueSize.get();
+    }
+  }
+
+  Span createSpan(long length) {
+    long time = System.currentTimeMillis();
+    Span span = new MilliSpan.Builder().begin(time).end(time + length).description("desc").parents(Collections.<Long>
emptyList()).spanId(1).traceId(2).build();
+    return span;
+  }
+
+  @Test
+  public void test() throws InterruptedException {
+    TestReceiver receiver = new TestReceiver();
+
+    receiver.receiveSpan(createSpan(0));
+    while (receiver.getQueueSize() > 0) {
+      Thread.sleep(500);
+    }
+    assertEquals(0, receiver.getQueueSize());
+    assertEquals(0, receiver.getSpansSent());
+
+    receiver.receiveSpan(createSpan(1));
+    while (receiver.getQueueSize() > 0) {
+      Thread.sleep(500);
+    }
+    assertEquals(0, receiver.getQueueSize());
+    assertEquals(1, receiver.getSpansSent());
+  }
+
+  @Test
+  public void testKeepAll() throws InterruptedException {
+    TestReceiver receiver = new TestReceiver(HTraceConfiguration.fromMap(Collections.singletonMap(AsyncSpanReceiver.SPAN_MIN_MS,
"0")));
+
+    receiver.receiveSpan(createSpan(0));
+    while (receiver.getQueueSize() > 0) {
+      Thread.sleep(500);
+    }
+    assertEquals(0, receiver.getQueueSize());
+    assertEquals(1, receiver.getSpansSent());
+  }
+
+  @Test
+  public void testExcludeMore() throws InterruptedException {
+    TestReceiver receiver = new TestReceiver(HTraceConfiguration.fromMap(Collections.singletonMap(AsyncSpanReceiver.SPAN_MIN_MS,
"10")));
+
+    receiver.receiveSpan(createSpan(0));
+    while (receiver.getQueueSize() > 0) {
+      Thread.sleep(500);
+    }
+    assertEquals(0, receiver.getQueueSize());
+    assertEquals(0, receiver.getSpansSent());
+
+    receiver.receiveSpan(createSpan(9));
+    while (receiver.getQueueSize() > 0) {
+      Thread.sleep(500);
+    }
+    assertEquals(0, receiver.getQueueSize());
+    assertEquals(0, receiver.getSpansSent());
+
+    receiver.receiveSpan(createSpan(10));
+    while (receiver.getQueueSize() > 0) {
+      Thread.sleep(500);
+    }
+    assertEquals(0, receiver.getQueueSize());
+    assertEquals(1, receiver.getSpansSent());
+  }
+}


Mime
View raw message