htrace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [2/3] incubator-htrace git commit: HTRACE-237. Optimize htraced span receiver (cmccabe)
Date Mon, 12 Oct 2015 20:13:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java
new file mode 100644
index 0000000..8b59a72
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.CharBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.htrace.core.Span;
+
+class PackedBufferManager implements BufferManager {
+  private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
+  private static final int MAX_PREQUEL_LENGTH = 2048;
+  private static final int METHOD_ID_WRITE_SPANS = 0x1;
+  private final Conf conf;
+  private final ByteBuffer frameBuffer;
+  private final PackedBuffer prequel;
+  private final PackedBuffer spans;
+  private final Selector selector;
+  private int numSpans;
+
+  PackedBufferManager(Conf conf) throws IOException {
+    this.conf = conf;
+    this.frameBuffer = ByteBuffer.allocate(PackedBuffer.HRPC_REQ_FRAME_LENGTH);
+    this.prequel = new PackedBuffer(ByteBuffer.allocate(MAX_PREQUEL_LENGTH));
+    this.spans = new PackedBuffer(ByteBuffer.allocate(conf.bufferSize));
+    this.selector = SelectorProvider.provider().openSelector();
+    clear();
+  }
+
+  @Override
+  public void writeSpan(Span span) throws IOException {
+    spans.writeSpan(span);
+    numSpans++;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("wrote " + span.toJson() + " to PackedBuffer for " +
+          conf.endpointStr + ". numSpans = " + numSpans +
+          ", buffer position = " + spans.getBuffer().position());
+    }
+  }
+
+  @Override
+  public int contentLength() {
+    return spans.getBuffer().position();
+  }
+
+  @Override
+  public int getNumberOfSpans() {
+    return numSpans;
+  }
+
+  @Override
+  public void prepare() throws IOException {
+    prequel.beginWriteSpansRequest(null, numSpans);
+    long totalLength =
+        prequel.getBuffer().position() + spans.getBuffer().position();
+    if (totalLength > PackedBuffer.MAX_HRPC_BODY_LENGTH) {
+      throw new IOException("Can't send RPC of " + totalLength + " bytes " +
+          "because it is longer than " + PackedBuffer.MAX_HRPC_BODY_LENGTH);
+    }
+    PackedBuffer.writeReqFrame(frameBuffer,
+        METHOD_ID_WRITE_SPANS, 1, (int)totalLength);
+    frameBuffer.flip();
+    prequel.getBuffer().flip();
+    spans.getBuffer().flip();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Preparing to send RPC of length " +
+          (totalLength + PackedBuffer.HRPC_REQ_FRAME_LENGTH) + " to " +
+          conf.endpointStr + ", containing " + numSpans + " spans.");
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    SelectionKey sockKey = null;
+    IOException ioe = null;
+    frameBuffer.position(0);
+    prequel.getBuffer().position(0);
+    spans.getBuffer().position(0);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Preparing to flush " + numSpans + " spans to " +
+          conf.endpointStr);
+    }
+    try {
+      sockKey = doConnect();
+      doSend(sockKey, new ByteBuffer[] {
+          frameBuffer, prequel.getBuffer(), spans.getBuffer() });
+      ByteBuffer response = prequel.getBuffer();
+      readAndValidateResponseFrame(sockKey, response,
+          1, METHOD_ID_WRITE_SPANS);
+    } catch (IOException e) {
+      // This LOG message is only at debug level because we also log these
+      // exceptions at error level inside HTracedReceiver.  The logging in
+      // HTracedReceiver is rate-limited to avoid overwhelming the client log
+      // if htraced goes down.  The debug and trace logging is not
+      // rate-limited.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got exception during flush", e);
+      }
+      ioe = e;
+    } finally {
+      if (sockKey != null) {
+        sockKey.cancel();
+        try {
+          SocketChannel sock = (SocketChannel)sockKey.attachment();
+          sock.close();
+        } catch (IOException e) {
+          if (ioe != null) {
+            ioe.addSuppressed(e);
+          }
+        }
+      }
+    }
+    if (ioe != null) {
+      throw ioe;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Successfully flushed " + numSpans + " spans to " +
+          conf.endpointStr);
+    }
+  }
+
+  private long updateRemainingMs(long startMs, long timeoutMs) {
+    long deltaMs = TimeUtil.deltaMs(startMs, TimeUtil.nowMs());
+    if (deltaMs > timeoutMs) {
+      return 0;
+    }
+    return timeoutMs - deltaMs;
+  }
+
+  private SelectionKey doConnect() throws IOException {
+    SocketChannel sock = SocketChannel.open();
+    SelectionKey sockKey = null;
+    boolean success = false;
+    try {
+      if (sock.isBlocking()) {
+        sock.configureBlocking(false);
+      }
+      InetSocketAddress resolvedEndpoint =
+          new InetSocketAddress(conf.endpoint.getHostString(),
+              conf.endpoint.getPort());
+      resolvedEndpoint.getHostName(); // trigger DNS resolution
+      sock.connect(resolvedEndpoint);
+      sockKey = sock.register(selector, SelectionKey.OP_CONNECT, sock);
+      long startMs = TimeUtil.nowMs();
+      long remainingMs = conf.connectTimeoutMs;
+      while (true) {
+        selector.select(remainingMs);
+        for (SelectionKey key : selector.keys()) {
+          if (key.isConnectable()) {
+            SocketChannel s = (SocketChannel)key.attachment();
+            s.finishConnect();
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Successfully connected to " + conf.endpointStr + ".");
+            }
+            success = true;
+            return sockKey;
+          }
+        }
+        remainingMs = updateRemainingMs(startMs, conf.connectTimeoutMs);
+        if (remainingMs == 0) {
+          throw new IOException("Attempt to connect to " + conf.endpointStr +
+              " timed out after " +  TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
+              " ms.");
+        }
+      }
+    } finally {
+      if (!success) {
+        if (sockKey != null) {
+          sockKey.cancel();
+        }
+        sock.close();
+      }
+    }
+  }
+
+  /**
+   * Send the provided ByteBuffer objects.
+   *
+   * We use non-blocking I/O because Java does not provide write timeouts.
+   * Without a write timeout, the socket could get hung and we'd never recover.
+   * We also use the GatheringByteChannel#write method which calls the pread()
+   * system call under the covers.  This ensures that even if TCP_NODELAY is on,
+   * we send the minimal number of packets.
+   */
+  private void doSend(SelectionKey sockKey, ByteBuffer[] bufs)
+        throws IOException {
+    long totalWritten = 0;
+    sockKey.interestOps(SelectionKey.OP_WRITE);
+    SocketChannel sock = (SocketChannel)sockKey.attachment();
+    long startMs = TimeUtil.nowMs();
+    long remainingMs = conf.ioTimeoutMs;
+    while (true) {
+      selector.select(remainingMs);
+      int firstBuf = 0;
+      for (SelectionKey key : selector.selectedKeys()) {
+        if (key.isWritable()) {
+          long written = sock.write(bufs, firstBuf, bufs.length - firstBuf);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Sent " + written + " bytes to " + conf.endpointStr);
+          }
+          totalWritten += written;
+        }
+      }
+      while (true) {
+        if (firstBuf == bufs.length) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Finished sending " + totalWritten + " bytes to " +
+                conf.endpointStr);
+          }
+          return;
+        }
+        if (bufs[firstBuf].remaining() > 0) {
+          break;
+        }
+        firstBuf++;
+      }
+      remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs);
+      if (remainingMs == 0) {
+        throw new IOException("Attempt to write to " + conf.endpointStr +
+            " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
+            " ms.");
+      }
+    }
+  }
+
+  private void doRecv(SelectionKey sockKey, ByteBuffer response)
+      throws IOException {
+    sockKey.interestOps(SelectionKey.OP_READ);
+    SocketChannel sock = (SocketChannel)sockKey.attachment();
+    int totalRead = response.remaining();
+    long startMs = TimeUtil.nowMs();
+    long remainingMs = conf.ioTimeoutMs;
+    while (remainingMs > 0) {
+      selector.select(remainingMs);
+      for (SelectionKey key : selector.selectedKeys()) {
+        if (key.isReadable()) {
+          sock.read(response);
+        }
+      }
+      if (response.remaining() == 0) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Received all " + totalRead + " bytes from " +
+              conf.endpointStr);
+        }
+        return;
+      }
+      remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Received " + (totalRead - response.remaining()) +
+                " out of " + totalRead + " bytes from " + conf.endpointStr);
+      }
+      if (remainingMs == 0) {
+        throw new IOException("Attempt to write to " + conf.endpointStr +
+            " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) +
+            " ms.");
+      }
+    }
+  }
+
+  private void readAndValidateResponseFrame(SelectionKey sockKey,
+        ByteBuffer buf, long expectedSeq, int expectedMethodId)
+          throws IOException {
+    buf.clear();
+    buf.limit(PackedBuffer.HRPC_RESP_FRAME_LENGTH);
+    doRecv(sockKey, buf);
+    buf.flip();
+    buf.order(ByteOrder.LITTLE_ENDIAN);
+    long seq = buf.getLong();
+    if (seq != expectedSeq) {
+      throw new IOException("Expected sequence number " + expectedSeq +
+          ", but got sequence number " + seq);
+    }
+    int methodId = buf.getInt();
+    if (expectedMethodId != methodId) {
+      throw new IOException("Expected method id " + expectedMethodId +
+          ", but got " + methodId);
+    }
+    int errorLength = buf.getInt();
+    buf.getInt();
+    if ((errorLength < 0) ||
+        (errorLength > PackedBuffer.MAX_HRPC_ERROR_LENGTH)) {
+      throw new IOException("Got server error with invalid length " +
+          errorLength);
+    } else if (errorLength > 0) {
+      buf.clear();
+      buf.limit(errorLength);
+      doRecv(sockKey, buf);
+      buf.flip();
+      CharBuffer charBuf = StandardCharsets.UTF_8.decode(buf);
+      String serverErrorStr = charBuf.toString();
+      throw new IOException("Got server error " + serverErrorStr);
+    }
+  }
+
+  @Override
+  public void clear() {
+    frameBuffer.clear();
+    prequel.getBuffer().clear();
+    spans.getBuffer().clear();
+    numSpans = 0;
+  }
+
+  @Override
+  public void close() {
+    clear();
+    prequel.close();
+    spans.close();
+    try {
+      selector.close();
+    } catch (IOException e) {
+      LOG.warn("Error closing selector", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java
new file mode 100644
index 0000000..ac42ee8
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+
+/**
+ * A logger which rate-limits its logging to a configurable level.
+ */
+class RateLimitedLogger {
+  private final Log log;
+  private final long timeoutMs;
+  private long lastLogTimeMs;
+
+  public RateLimitedLogger(Log log, long timeoutMs) {
+    this.log = log;
+    this.timeoutMs = timeoutMs;
+    synchronized (this) {
+      this.lastLogTimeMs = 0L;
+    }
+  }
+
+  public void warn(String what) {
+    long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+        TimeUnit.NANOSECONDS);
+    synchronized (this) {
+      if (now >= lastLogTimeMs + timeoutMs) {
+        log.warn(what);
+        lastLogTimeMs = now;
+      }
+    }
+  }
+
+  public void error(String what) {
+    long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+        TimeUnit.NANOSECONDS);
+    synchronized (this) {
+      if (now >= lastLogTimeMs + timeoutMs) {
+        log.error(what);
+        lastLogTimeMs = now;
+      }
+    }
+  }
+
+  public void error(String what, Throwable e) {
+    long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
+        TimeUnit.NANOSECONDS);
+    synchronized (this) {
+      if (now >= lastLogTimeMs + timeoutMs) {
+        log.error(what, e);
+        lastLogTimeMs = now;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
new file mode 100644
index 0000000..2e1aa70
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.htrace.core.Span;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentProvider;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpStatus;
+
+class RestBufferManager implements BufferManager {
+  private static final Log LOG = LogFactory.getLog(RestBufferManager.class);
+  private static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static ObjectWriter JSON_WRITER = OBJECT_MAPPER.writer();
+  private static final Charset UTF8 = Charset.forName("UTF-8");
+  private static final byte COMMA_BYTE = (byte)0x2c;
+  private static final int MAX_PREQUEL_LENGTH = 512;
+  private static final int MAX_EPILOGUE_LENGTH = 32;
+  private final Conf conf;
+  private final HttpClient httpClient;
+  private final String urlString;
+  private final ByteBuffer prequel;
+  private final ByteBuffer spans;
+  private final ByteBuffer epilogue;
+  private int numSpans;
+
+  private static class RestBufferManagerContentProvider
+      implements ContentProvider {
+    private final ByteBuffer[] bufs;
+
+    private class ByteBufferIterator implements Iterator<ByteBuffer> {
+      private int bufIdx = -1;
+
+      @Override
+      public boolean hasNext() {
+        return (bufIdx + 1) < bufs.length;
+      }
+
+      @Override
+      public ByteBuffer next() {
+        if ((bufIdx + 1) >= bufs.length) {
+          throw new NoSuchElementException();
+        }
+        bufIdx++;
+        return bufs[bufIdx];
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    RestBufferManagerContentProvider(ByteBuffer[] bufs) {
+      this.bufs = bufs;
+    }
+
+    @Override
+    public long getLength() {
+      long total = 0;
+      for (int i = 0; i < bufs.length; i++) {
+        total += bufs[i].remaining();
+      }
+      return total;
+    }
+
+    @Override
+    public Iterator<ByteBuffer> iterator() {
+      return new ByteBufferIterator();
+    }
+  }
+
+  /**
+   * Create an HttpClient instance.
+   *
+   * @param connTimeout         The timeout to use for connecting.
+   * @param idleTimeout         The idle timeout to use.
+   */
+  static HttpClient createHttpClient(long connTimeout, long idleTimeout) {
+    HttpClient httpClient = new HttpClient();
+    httpClient.setUserAgentField(
+        new HttpField(HttpHeader.USER_AGENT, "HTracedSpanReceiver"));
+    httpClient.setConnectTimeout(connTimeout);
+    httpClient.setIdleTimeout(idleTimeout);
+    return httpClient;
+  }
+
+  RestBufferManager(Conf conf) throws Exception {
+    this.conf = conf;
+    this.httpClient =
+        createHttpClient(conf.connectTimeoutMs, conf.idleTimeoutMs);
+    this.urlString = new URL("http", conf.endpoint.getHostName(),
+        conf.endpoint.getPort(), "/writeSpans").toString();
+    this.prequel = ByteBuffer.allocate(MAX_PREQUEL_LENGTH);
+    this.spans = ByteBuffer.allocate(conf.bufferSize);
+    this.epilogue = ByteBuffer.allocate(MAX_EPILOGUE_LENGTH);
+    clear();
+    this.httpClient.start();
+  }
+
+  @Override
+  public void writeSpan(Span span) throws IOException {
+    byte[] spanJsonBytes = JSON_WRITER.writeValueAsBytes(span);
+    if ((spans.capacity() - spans.position()) < (spanJsonBytes.length + 1)) {
+      // Make sure we have enough space for the span JSON and a comma.
+      throw new IOException("Not enough space remaining in span buffer.");
+    }
+    spans.put(COMMA_BYTE);
+    spans.put(spanJsonBytes);
+    numSpans++;
+  }
+
+  @Override
+  public int contentLength() {
+    return Math.max(spans.position() - 1, 0);
+  }
+
+  @Override
+  public int getNumberOfSpans() {
+    return numSpans;
+  }
+
+  @Override
+  public void prepare() throws IOException {
+    String prequelString = "{\"Spans\":[";
+    prequel.put(prequelString.getBytes(UTF8));
+    prequel.flip();
+
+    spans.flip();
+
+    String epilogueString = "]}";
+    epilogue.put(epilogueString.toString().getBytes(UTF8));
+    epilogue.flip();
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Preparing to send " + contentLength() + " bytes of span " +
+          "data to " + conf.endpointStr + ", containing " + numSpans +
+          " spans.");
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // Position the buffers at the beginning.
+    prequel.position(0);
+    spans.position(spans.limit() == 0 ? 0 : 1); // Skip the first comma
+    epilogue.position(0);
+
+    RestBufferManagerContentProvider contentProvider =
+        new RestBufferManagerContentProvider(
+            new ByteBuffer[] { prequel, spans, epilogue });
+    long rpcLength = contentProvider.getLength();
+    try {
+      Request request = httpClient.
+          newRequest(urlString).method(HttpMethod.POST);
+      request.header(HttpHeader.CONTENT_TYPE, "application/json");
+      request.content(contentProvider);
+      ContentResponse response = request.send();
+      if (response.getStatus() != HttpStatus.OK_200) {
+        throw new IOException("Got back error response " +
+            response.getStatus() + " from " + conf.endpointStr + "; " +
+            response.getContentAsString());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sent WriteSpansReq of length " + rpcLength + " to " + conf.endpointStr);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while sending spans via REST", e);
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out sending spans via REST", e);
+    } catch (ExecutionException e) {
+      throw new IOException("Execution exception sending spans via REST", e);
+    }
+  }
+
+  @Override
+  public void clear() {
+    prequel.clear();
+    spans.clear();
+    epilogue.clear();
+    numSpans = 0;
+  }
+
+  @Override
+  public void close() {
+    try {
+      httpClient.stop();
+    } catch (Exception e) {
+      LOG.error("Error stopping HTracedReceiver httpClient", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java
new file mode 100644
index 0000000..7361c97
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.math.BigInteger;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utilities for dealing with monotonic time.
+ */
+class TimeUtil {
+  /**
+   * Returns the current monotonic time in milliseconds.
+   */
+  static long nowMs() {
+    return TimeUnit.MILLISECONDS.convert(
+        System.nanoTime(), TimeUnit.NANOSECONDS);
+  }
+
+  /**
+   * Get the approximate delta between two monotonic times.
+   *
+   * This function makes the following assumptions:
+   * 1. We read startMs from the monotonic clock prior to endMs.
+   * 2. The two times are not more than 100 years or so apart.
+   *
+   * With these two assumptions in hand, we can smooth over some of the
+   * unpleasant features of the monotonic clock:
+   * 1. It can return either positive or negative values.
+   * 2. When the number of nanoseconds reaches Long.MAX_VALUE it wraps around
+   * to Long.MIN_VALUE.
+   * 3. On some messed up systems it has been known to jump backwards every
+   * now and then.  Oops.  CPU core synchronization mumble mumble.
+   *
+   * @param startMs  The start time.
+   * @param endMs    The end time.
+   * @return         The delta between the two times.
+   */
+  static long deltaMs(long startMs, long endMs) {
+    BigInteger startNs = BigInteger.valueOf(TimeUnit.NANOSECONDS.
+        convert(startMs, TimeUnit.MILLISECONDS));
+    BigInteger endNs = BigInteger.valueOf(TimeUnit.NANOSECONDS.
+        convert(endMs, TimeUnit.MILLISECONDS));
+    BigInteger deltaNs = endNs.subtract(startNs);
+    if (deltaNs.signum() >= 0) {
+      return TimeUnit.MILLISECONDS.convert(deltaNs.min(
+          BigInteger.valueOf(Long.MAX_VALUE)).longValue(), TimeUnit.NANOSECONDS);
+    }
+    deltaNs = deltaNs.negate();
+    if (deltaNs.compareTo(BigInteger.valueOf(Long.MAX_VALUE / 2)) < 0) {
+      // If the 'startNs' is numerically less than the 'endNs', and the
+      // difference between the two is less than 100 years, it's probably
+      // just clock jitter.  Certain old OSes and CPUs had monotonic clocks
+      // that could go backwards under certain conditions (ironic, given
+      // the name).
+      return 0L;
+    }
+    // Handle rollover.
+    BigInteger revDeltaNs = BigInteger.ONE.shiftLeft(64).subtract(deltaNs);
+    return TimeUnit.MILLISECONDS.convert(revDeltaNs.min(
+        BigInteger.valueOf(Long.MAX_VALUE)).longValue(), TimeUnit.NANOSECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java
new file mode 100644
index 0000000..f224f6f
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.UUID;
+
+/**
+ * Small util for making a data directory for tests to use when running tests. We put it up at
+ * target/test-data/UUID.  Create an instance of this class per unit test run and it will take
+ * care of setting up the dirs for you.  Pass what is returned here as location from which to
+ * have daemons and tests dump data.
+ */
+public class DataDir implements Closeable {
+  private final File dir;
+
+  public DataDir() throws IOException {
+    String baseDir = System.getProperty(
+        "test.data.base.dir", "target");
+    File testData = new File(new File(baseDir), "test-data");
+    this.dir = new File(testData, UUID.randomUUID().toString());
+    Files.createDirectories(this.dir.toPath());
+  }
+
+  public File get() {
+    return dir;
+  }
+
+  @Override
+  public void close() throws IOException {
+    /*for (File file : this.dir.listFiles()) {
+      file.delete();
+    }
+    Files.delete(this.dir.toPath()); */
+  }
+
+  @Override
+  public String toString() {
+    return "DataDir{" + dir.getAbsolutePath() + "}";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java
new file mode 100644
index 0000000..26c1a10
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.file.Paths;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.http.HttpStatus;
+import org.junit.Assert;
+
+/**
+ * To get instance of HTraced up and running, create an instance of this class.
+ * Upon successful construction, htraced is running using <code>dataDir</code> as directory to
+ * host data (leveldbs and logs).
+ */
+class HTracedProcess extends Process {
+  private static final Log LOG = LogFactory.getLog(HTracedProcess.class);
+
+  static class Builder {
+    String host = "localhost";
+
+    Builder() {
+    }
+
+    Builder host(String host) {
+      this.host = host;
+      return this;
+    }
+
+    HTracedProcess build() throws Exception {
+      return new HTracedProcess(this);
+    }
+  }
+
+  /**
+   * Path to the htraced binary.
+   */
+  private final File htracedPath;
+
+  /**
+   * Temporary directory for test files.
+   */
+  private final DataDir dataDir;
+
+  /**
+   * The Java Process object for htraced.
+   */
+  private final Process delegate;
+
+  /**
+   * The HTTP host:port returned from htraced.
+   */
+  private final String httpAddr;
+
+  /**
+   * The HRPC host:port returned from htraced.
+   */
+  private final String hrpcAddr;
+
+  /**
+   * REST client to use to talk to htraced.
+   */
+  private final HttpClient httpClient;
+
+  /**
+   * Data send back from the HTraced process on the notification port.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  public static class StartupNotificationData {
+    /**
+     * The hostname:port pair which the HTraced process uses for HTTP requests.
+     */
+    @JsonProperty("HttpAddr")
+    String httpAddr;
+
+    /**
+     * The hostname:port pair which the HTraced process uses for HRPC requests.
+     */
+    @JsonProperty("HrpcAddr")
+    String hrpcAddr;
+
+    /**
+     * The process ID of the HTraced process.
+     */
+    @JsonProperty("ProcessId")
+    long processId;
+  }
+
+  private HTracedProcess(Builder builder) throws Exception {
+    this.htracedPath = Paths.get(
+        "target", "..", "go", "build", "htraced").toFile();
+    if (!this.htracedPath.exists()) {
+      throw new RuntimeException("No htraced binary exists at " +
+          this.htracedPath);
+    }
+    this.dataDir = new DataDir();
+    // Create a notifier socket bound to a random port.
+    ServerSocket listener = new ServerSocket(0);
+    boolean success = false;
+    Process process = null;
+    HttpClient http = null;
+    try {
+      // Use a random port for the web address.  No 'scheme' yet.
+      String random = builder.host + ":0";
+      String logPath = new File(dataDir.get(), "log.txt").getAbsolutePath();
+      // Pass cmdline args to htraced to it uses our test dir for data.
+      ProcessBuilder pb = new ProcessBuilder(htracedPath.getAbsolutePath(),
+        "-Dlog.level=TRACE",
+        "-Dlog.path=" + logPath,
+        "-Dweb.address=" + random,
+        "-Dhrpc.address=" + random,
+        "-Ddata.store.clear=true",
+        "-Dstartup.notification.address=localhost:" + listener.getLocalPort(),
+        "-Ddata.store.directories=" + dataDir.get().getAbsolutePath());
+      pb.redirectErrorStream(true);
+      // Inherit STDERR/STDOUT i/o; dumps on console for now.  Can add logs later.
+      pb.inheritIO();
+      pb.directory(dataDir.get());
+      //assert pb.redirectInput() == Redirect.PIPE;
+      //assert pb.redirectOutput().file() == dataDir;
+      process = pb.start();
+      assert process.getInputStream().read() == -1;
+      StartupNotificationData data = readStartupNotification(listener);
+      httpAddr = data.httpAddr;
+      hrpcAddr = data.hrpcAddr;
+      LOG.info("Started htraced process " + data.processId + " with http " +
+               "address " + data.httpAddr + ", logging to " + logPath);
+      http = RestBufferManager.createHttpClient(60000L, 60000L);
+      http.start();
+      success = true;
+    } finally {
+      if (!success) {
+        // Clean up after failure
+        if (process != null) {
+          process.destroy();
+          process = null;
+        }
+        if (http != null) {
+          http.stop();
+        }
+      }
+      delegate = process;
+      listener.close();
+      httpClient = http;
+    }
+  }
+
+  private static StartupNotificationData
+      readStartupNotification(ServerSocket listener) throws IOException {
+    Socket socket = listener.accept();
+    try {
+      InputStream in = socket.getInputStream();
+      ObjectMapper objectMapper = new ObjectMapper();
+      StartupNotificationData data = objectMapper.
+          readValue(in, StartupNotificationData.class);
+      return data;
+    } finally {
+      socket.close();
+    }
+  }
+
+  public int hashCode() {
+    return delegate.hashCode();
+  }
+
+  public OutputStream getOutputStream() {
+    throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT");
+  }
+
+  public InputStream getInputStream() {
+    throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT");
+  }
+
+  public boolean equals(Object obj) {
+    return delegate.equals(obj);
+  }
+
+  public InputStream getErrorStream() {
+    throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT");
+  }
+
+  public int waitFor() throws InterruptedException {
+    return delegate.waitFor();
+  }
+
+  public int exitValue() {
+    return delegate.exitValue();
+  }
+
+  public void destroy() {
+    try {
+      httpClient.stop();
+    } catch (Exception e) {
+      LOG.error("Error stopping httpClient", e);
+    }
+    delegate.destroy();
+    try {
+      dataDir.close();
+    } catch (Exception e) {
+      LOG.error("Error closing " + dataDir, e);
+    }
+    LOG.trace("Destroyed htraced process.");
+  }
+
+  public String toString() {
+    return delegate.toString();
+  }
+
+  public String getHttpAddr() {
+    return httpAddr;
+  }
+
+  public String getHrpcAddr() {
+    return hrpcAddr;
+  }
+
+  /**
+   * Ugly but how else to do file-math?
+   * @param topLevel Presumes top-level of the htrace checkout.
+   * @return Path to the htraced binary.
+   */
+  public static File getPathToHTraceBinaryFromTopLevel(final File topLevel) {
+    return new File(new File(new File(new File(topLevel, "htrace-htraced"), "go"), "build"),
+      "htraced");
+  }
+
+  public String getServerInfoJson() throws Exception {
+    ContentResponse response = httpClient.GET(
+        new URI(String.format("http://%s/server/info", httpAddr)));
+    Assert.assertEquals("application/json", response.getMediaType());
+    Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
+    return response.getContentAsString();
+  }
+
+  public Span getSpan(SpanId spanId) throws Exception {
+    ContentResponse response = httpClient.GET(
+        new URI(String.format("http://%s/span/%s",
+            httpAddr, spanId.toString())));
+    Assert.assertEquals("application/json", response.getMediaType());
+    String responseJson = response.getContentAsString().trim();
+    if (responseJson.isEmpty()) {
+      return null;
+    }
+    return MilliSpan.fromJson(responseJson);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
deleted file mode 100644
index d52f071..0000000
--- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.net.URL;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.MilliSpan;
-import org.apache.htrace.core.Span;
-import org.apache.htrace.core.SpanId;
-import org.apache.htrace.core.TracerId;
-import org.apache.htrace.util.DataDir;
-import org.apache.htrace.util.HTracedProcess;
-import org.apache.htrace.util.TestUtil;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.api.ContentResponse;
-import org.eclipse.jetty.http.HttpStatus;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestHTracedRESTReceiver {
-  private static final Log LOG =
-      LogFactory.getLog(TestHTracedRESTReceiver.class);
-  private URL restServerUrl;
-  private DataDir dataDir;
-  HTracedProcess htraced;
-
-  @Before
-  public void setUp() throws Exception {
-    this.dataDir = new DataDir();
-    File tlDir = DataDir.getTopLevelOfCheckout(this.dataDir.getDataDir());
-    File pathToHTracedBinary = HTracedProcess.getPathToHTraceBinaryFromTopLevel(tlDir);
-    this.htraced = new HTracedProcess(pathToHTracedBinary,
-        dataDir.getDataDir(), "localhost");
-    this.restServerUrl = new URL("http://" + htraced.getHttpAddr() + "/");
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (this.htraced != null) this.htraced.destroy();
-  }
-
-  /**
-   * Our simple version of htrace configuration for testing.
-   */
-  private final class TestHTraceConfiguration extends HTraceConfiguration {
-    private final URL restServerUrl;
-    final static String TRACER_ID = "TestHTracedRESTReceiver";
-
-    public TestHTraceConfiguration(final URL restServerUrl) {
-      this.restServerUrl = restServerUrl;
-    }
-
-    @Override
-    public String get(String key) {
-      return null;
-    }
-
-    @Override
-    public String get(String key, String defaultValue) {
-      if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) {
-        return this.restServerUrl.toString();
-      } else if (key.equals(TracerId.TRACER_ID_KEY)) {
-        return TRACER_ID;
-      }
-      return defaultValue;
-    }
-  }
-
-  /**
-   * Make sure the REST server basically works.
-   * @throws Exception
-   */
-  @Test (timeout = 10000)
-  public void testBasicGet() throws Exception {
-    HTracedRESTReceiver receiver =
-      new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
-    HttpClient http = receiver.createHttpClient(60000L, 60000L);
-    http.start();
-    try {
-      // Do basic a GET /server/info against htraced
-      ContentResponse response =
-        http.GET(restServerUrl + "server/info");
-      assertEquals("application/json", response.getMediaType());
-      String content = processGET(response);
-      assertTrue(content.contains("ReleaseVersion"));
-      System.out.println(content);
-    } finally {
-      http.stop();
-      receiver.close();
-    }
-  }
-
-  private String processGET(final ContentResponse response) {
-    assertTrue("" + response.getStatus(), HttpStatus.OK_200 <= response.getStatus() &&
-      response.getStatus() <= HttpStatus.NO_CONTENT_204);
-    return response.getContentAsString();
-  }
-
-  private void testSendingSpansImpl(boolean testClose) throws Exception {
-    final HTracedRESTReceiver receiver =
-      new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
-    final int NUM_SPANS = 3;
-    final HttpClient http = receiver.createHttpClient(60000, 60000);
-    http.start();
-    Span spans[] = new Span[NUM_SPANS];
-    for (int i = 0; i < NUM_SPANS; i++) {
-      MilliSpan.Builder builder = new MilliSpan.Builder().
-          parents(new SpanId[] { new SpanId(1L, 1L) }).
-          spanId(new SpanId(1L, i));
-      if (i == NUM_SPANS - 1) {
-        builder.tracerId("specialTrid");
-      } else {
-        builder.tracerId(TestHTraceConfiguration.TRACER_ID);
-      }
-      spans[i] = builder.build();
-    }
-    try {
-      for (int i = 0; i < NUM_SPANS; i++) {
-        LOG.info("receiving " + spans[i].toString());
-        receiver.receiveSpan(spans[i]);
-      }
-      if (testClose) {
-        receiver.close();
-      } else {
-        receiver.startFlushing();
-      }
-      TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          try {
-            for (int i = 0; i < NUM_SPANS; i++) {
-              // This is what the REST server expects when querying for a
-              // span id.
-              String findSpan = String.format("span/%s",
-                  new SpanId(1L, i).toString());
-              ContentResponse response =
-                  http.GET(restServerUrl + findSpan);
-              String content = processGET(response);
-              if ((content == null) || (content.length() == 0)) {
-                LOG.info("Failed to find span " + i);
-                return false;
-              }
-              LOG.info("Got " + content + " for span " + i);
-              MilliSpan dspan = MilliSpan.fromJson(content);
-              assertEquals(new SpanId(1, i).toString(),
-                dspan.getSpanId().toString());
-              // Every span should have the tracer ID we set in the
-              // configuration... except for the last span, which had
-              // a custom value set.
-              if (i == NUM_SPANS - 1) {
-                assertEquals("specialTrid", dspan.getTracerId());
-              } else {
-                assertEquals(TestHTraceConfiguration.TRACER_ID,
-                    dspan.getTracerId());
-              }
-            }
-            return true;
-          } catch (Throwable t) {
-            LOG.error("Got exception", t);
-            return false;
-          }
-        }
-      }, 10, 20000);
-    } finally {
-      http.stop();
-      if (!testClose) {
-        receiver.close();
-      }
-    }
-  }
-
-  /**
-   * Send 100 spans then confirm they made it in.
-   * @throws Exception
-   */
-  @Test (timeout = 60000)
-  public void testSendingSpans() throws Exception {
-    testSendingSpansImpl(false);
-  }
-
-  /**
-   * Test that the REST receiver blocks during shutdown until all spans are sent
-   * (or a long timeout elapses).  Otherwise, short-lived client processes will
-   * never have a chance to send all their spans and we will have incomplete
-   * information.
-   */
-  @Test (timeout = 60000)
-  public void testShutdownBlocksUntilSpanAreSent() throws Exception {
-    testSendingSpansImpl(true);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java
new file mode 100644
index 0000000..99f00a1
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java
@@ -0,0 +1,572 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.TracerId;
+import org.apache.htrace.impl.HTracedSpanReceiver.FaultInjector;
+import org.apache.htrace.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+public class TestHTracedReceiver {
+  private static final Log LOG = LogFactory.getLog(TestHTracedReceiver.class);
+
+  @BeforeClass
+  public static void beforeClass() {
+    // Allow setting really small buffer sizes for testing purposes.
+    // We do not allow setting such small sizes in production.
+    Conf.BUFFER_SIZE_MIN = 0;
+  }
+
+  @Rule
+  public TestRule watcher = new TestWatcher() {
+    protected void starting(Description description) {
+      LOG.info("*** Starting junit test: " + description.getMethodName());
+    }
+
+    protected void finished(Description description) {
+      LOG.info("*** Finished junit test: " + description.getMethodName());
+    }
+  };
+
+  @Test(timeout = 60000)
+  public void testGetServerInfoJson() throws Exception {
+    HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      String response = ht.getServerInfoJson();
+      assertTrue(response.contains("ReleaseVersion"));
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  private void waitForSpans(final HTracedProcess ht, Span[] spans)
+      throws Exception {
+    waitForSpans(ht, spans, spans.length);
+  }
+
+  private void waitForSpans(final HTracedProcess ht, Span[] spans,
+      int numSpans) throws Exception {
+    final LinkedList<SpanId> spanIds = new LinkedList<SpanId>();
+    for (int i = 0; i < numSpans; i++) {
+      spanIds.add(spans[i].getSpanId());
+    }
+    boolean success = false;
+    try {
+      TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          for (Iterator<SpanId> iter = spanIds.iterator();
+               iter.hasNext(); ) {
+            SpanId spanId = iter.next();
+            try {
+              if (ht.getSpan(spanId) == null) {
+                return false;
+              }
+            } catch (InterruptedException e) {
+              LOG.error("Got InterruptedException while looking for " +
+                  "span ID " + spanId, e);
+              Thread.currentThread().interrupt();
+            } catch (Exception e) {
+              LOG.error("Got error looking for span ID " + spanId, e);
+              return false;
+            }
+            iter.remove();
+          }
+          return true;
+        }
+      }, 10, 30000);
+      success = true;
+    } finally {
+      if (!success) {
+        String prefix = "";
+        StringBuilder idStringBld = new StringBuilder();
+        for (Iterator<SpanId> iter = spanIds.iterator();
+             iter.hasNext(); ) {
+          idStringBld.append(prefix);
+          idStringBld.append(iter.next());
+          prefix = ",";
+        }
+        LOG.error("Unable to find span IDs " + idStringBld.toString());
+      }
+    }
+  }
+
+  /**
+   * Test that we can send spans via the HRPC interface.
+   */
+  @Test(timeout = 10000) //60000)
+  public void testSendSpansViaPacked() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testSendSpansViaPacked");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100");
+            put(Conf.ERROR_LOG_PERIOD_MS_KEY, "0");
+          }});
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+      Span[] spans = TestUtil.randomSpans(rand, 10);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      waitForSpans(ht, spans);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that when the SpanReceiver is closed, we send any spans we have
+   * buffered via the HRPC interface.
+   */
+  @Test(timeout = 60000)
+  public void testSendSpansViaPackedAndClose() throws Exception {
+    final Random rand = new Random(456);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testSendSpansViaPackedAndClose");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000");
+          }});
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+      Span[] spans = TestUtil.randomSpans(rand, 10);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      rcvr.close();
+      waitForSpans(ht, spans);
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that we can send spans via the REST interface.
+   */
+  @Test(timeout = 60000)
+  public void testSendSpansViaRest() throws Exception {
+    final Random rand = new Random(789);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testSendSpansViaRest");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100");
+          }});
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+      Span[] spans = TestUtil.randomSpans(rand, 10);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      waitForSpans(ht, spans);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that when the SpanReceiver is closed, we send any spans we have
+   * buffered via the REST interface.
+   */
+  @Test(timeout = 60000)
+  public void testSendSpansViaRestAndClose() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testSendSpansViaRestAndClose");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000");
+          }});
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
+      Span[] spans = TestUtil.randomSpans(rand, 10);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      rcvr.close();
+      waitForSpans(ht, spans);
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  private static class Mutable<T> {
+    private T t;
+
+    Mutable(T t) {
+      this.t = t;
+    }
+
+    void set(T t) {
+      this.t = t;
+    }
+
+    T get() {
+      return this.t;
+    }
+  }
+
+  private static class TestHandleContentLengthTriggerInjector
+      extends HTracedSpanReceiver.FaultInjector {
+    final Semaphore threadStartSem = new Semaphore(0);
+    int contentLengthOnTrigger = 0;
+
+    @Override
+    public synchronized void handleContentLengthTrigger(int len) {
+      contentLengthOnTrigger = len;
+    }
+    @Override
+    public void handleThreadStart() throws Exception {
+      threadStartSem.acquire();
+    }
+
+    public synchronized int getContentLengthOnTrigger() {
+      return contentLengthOnTrigger;
+    }
+  }
+
+  /**
+   * Test that filling up one of the buffers causes us to trigger a flush and
+   * start using the other buffer, when using PackedBufferManager.
+   * This also tests that PackedBufferManager can correctly handle a buffer
+   * getting full.
+   */
+  @Test(timeout = 60000)
+  public void testFullBufferCausesPackedThreadTrigger() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY,
+                "testFullBufferCausesPackedThreadTrigger");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.BUFFER_SIZE_KEY, "16384");
+            put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95");
+          }});
+      TestHandleContentLengthTriggerInjector injector =
+          new TestHandleContentLengthTriggerInjector();
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 47);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      Assert.assertTrue("The wakePostSpansThread should have been " +
+          "triggered by the spans added so far.  " +
+          "contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(),
+          injector.getContentLengthOnTrigger() > 16000);
+      injector.threadStartSem.release();
+      rcvr.close();
+      waitForSpans(ht, spans, 45);
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that filling up one of the buffers causes us to trigger a flush and
+   * start using the other buffer, when using RestBufferManager.
+   * This also tests that RestBufferManager can correctly handle a buffer
+   * getting full.
+   */
+  @Test(timeout = 60000)
+  public void testFullBufferCausesRestThreadTrigger() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY,
+                "testFullBufferCausesRestThreadTrigger");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.BUFFER_SIZE_KEY, "16384");
+            put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95");
+          }});
+      TestHandleContentLengthTriggerInjector injector =
+          new TestHandleContentLengthTriggerInjector();
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 34);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      Assert.assertTrue("The wakePostSpansThread should have been " +
+              "triggered by the spans added so far.  " +
+              "contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(),
+          injector.getContentLengthOnTrigger() > 16000);
+      injector.threadStartSem.release();
+      rcvr.close();
+      waitForSpans(ht, spans, 33);
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * A FaultInjector that causes all flushes to fail until a specified
+   * number of milliseconds have passed.
+   */
+  private static class TestInjectFlushFaults
+      extends HTracedSpanReceiver.FaultInjector {
+    private long remainingFaults;
+
+    TestInjectFlushFaults(long remainingFaults) {
+      this.remainingFaults = remainingFaults;
+    }
+
+    @Override
+    public synchronized void handleFlush() throws IOException {
+      if (remainingFaults > 0) {
+        remainingFaults--;
+        throw new IOException("Injected IOException into flush " +
+            "code path.");
+      }
+    }
+  }
+
+  /**
+   * Test that even if the flush fails, the system stays stable and we can
+   * still close the span receiver.
+   */
+  @Test(timeout = 60000)
+  public void testPackedThreadHandlesFlushFailure() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testPackedThreadHandlesFlushFailure");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+          }});
+      TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 15);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that even if the flush fails, the system stays stable and we can
+   * still close the span receiver.
+   */
+  @Test(timeout = 60000)
+  public void testRestThreadHandlesFlushFailure() throws Exception {
+    final Random rand = new Random(321);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testRestThreadHandlesFlushFailure");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+          }});
+      TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 15);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * A FaultInjector that causes all flushes to fail until a specified
+   * number of milliseconds have passed.
+   */
+  private static class WaitForFlushes
+      extends HTracedSpanReceiver.FaultInjector {
+    final Semaphore flushSem;
+
+    WaitForFlushes(int numFlushes) {
+      this.flushSem = new Semaphore(-numFlushes);
+    }
+
+    @Override
+    public void handleFlush() throws IOException {
+      flushSem.release();
+    }
+  }
+
+  /**
+   * Test that the packed code works when performing multiple flushes.
+   */
+  @Test(timeout = 60000)
+  public void testMultiplePackedFlushes() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testMultiplePackedFlushes");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1");
+          }});
+      WaitForFlushes injector = new WaitForFlushes(5);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 3);
+      while (true) {
+        for (Span span : spans) {
+          rcvr.receiveSpan(span);
+        }
+        if (injector.flushSem.availablePermits() >= 0) {
+          break;
+        }
+        Thread.sleep(1);
+      }
+      waitForSpans(ht, spans, 3);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that the REST code works when performing multiple flushes.
+   */
+  @Test(timeout = 60000)
+  public void testMultipleRestFlushes() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testMultipleRestFlushes");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1");
+          }});
+      WaitForFlushes injector = new WaitForFlushes(5);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 3);
+      while (true) {
+        for (Span span : spans) {
+          rcvr.receiveSpan(span);
+        }
+        if (injector.flushSem.availablePermits() >= 0) {
+          break;
+        }
+        Thread.sleep(1);
+      }
+      waitForSpans(ht, spans, 3);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that the packed code works when performing multiple flushes.
+   */
+  @Test(timeout = 60000)
+  public void testPackedRetryAfterFlushError() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testPackedRetryAfterFlushError");
+            put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
+            put(Conf.PACKED_KEY, "true");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000");
+            put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100");
+          }});
+      TestInjectFlushFaults injector = new TestInjectFlushFaults(5);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 3);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      waitForSpans(ht, spans);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+
+  /**
+   * Test that the REST code works when performing multiple flushes.
+   */
+  @Test(timeout = 60000)
+  public void testRestRetryAfterFlushError() throws Exception {
+    final Random rand = new Random(123);
+    final HTracedProcess ht = new HTracedProcess.Builder().build();
+    try {
+      HTraceConfiguration conf = HTraceConfiguration.fromMap(
+          new HashMap<String, String>() {{
+            put(TracerId.TRACER_ID_KEY, "testRestRetryAfterFlushError");
+            put(Conf.ADDRESS_KEY, ht.getHttpAddr());
+            put(Conf.PACKED_KEY, "false");
+            put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000");
+            put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100");
+          }});
+      TestInjectFlushFaults injector = new TestInjectFlushFaults(5);
+      HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
+      Span[] spans = TestUtil.randomSpans(rand, 3);
+      for (Span span : spans) {
+        rcvr.receiveSpan(span);
+      }
+      waitForSpans(ht, spans);
+      rcvr.close();
+    } finally {
+      ht.destroy();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java
new file mode 100644
index 0000000..bf038f1
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHTracedReceiverConf {
+  private static final Log LOG =
+      LogFactory.getLog(TestHTracedReceiverConf.class);
+
+  @Test(timeout = 60000)
+  public void testParseHostPort() throws Exception {
+    InetSocketAddress addr = new Conf(
+        HTraceConfiguration.fromKeyValuePairs(
+          Conf.ADDRESS_KEY, "example.com:8080")).endpoint;
+    Assert.assertEquals("example.com", addr.getHostName());
+    Assert.assertEquals(8080, addr.getPort());
+
+    addr = new Conf(
+        HTraceConfiguration.fromKeyValuePairs(
+          Conf.ADDRESS_KEY, "127.0.0.1:8081")).endpoint;
+    Assert.assertEquals("127.0.0.1", addr.getHostName());
+    Assert.assertEquals(8081, addr.getPort());
+
+    addr = new Conf(
+        HTraceConfiguration.fromKeyValuePairs(
+          Conf.ADDRESS_KEY, "[ff02:0:0:0:0:0:0:12]:9095")).endpoint;
+    Assert.assertEquals("ff02:0:0:0:0:0:0:12", addr.getHostName());
+    Assert.assertEquals(9095, addr.getPort());
+  }
+
+  private static void verifyFail(String hostPort) {
+    try {
+      new Conf(HTraceConfiguration.fromKeyValuePairs(
+            Conf.ADDRESS_KEY, hostPort));
+      Assert.fail("Expected bad host:port configuration " + hostPort +
+          " to fail, but it succeeded.");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testFailToParseHostPort() throws Exception {
+    verifyFail("localhost"); // no port
+    verifyFail("127.0.0.1"); // no port
+    verifyFail(":8080"); // no hostname
+    verifyFail("bob[ff02:0:0:0:0:0:0:12]:9095"); // bracket at incorrect place
+  }
+
+  @Test(timeout = 60000)
+  public void testGetIntArray() throws Exception {
+    int[] arr = Conf.getIntArray("");
+    Assert.assertEquals(0, arr.length);
+    arr = Conf.getIntArray("123");
+    Assert.assertEquals(1, arr.length);
+    Assert.assertEquals(123, arr[0]);
+    arr = Conf.getIntArray("1,2,3");
+    Assert.assertEquals(3, arr.length);
+    Assert.assertEquals(1, arr[0]);
+    Assert.assertEquals(2, arr[1]);
+    Assert.assertEquals(3, arr[2]);
+    arr = Conf.getIntArray(",-4,5,66,");
+    Assert.assertEquals(3, arr.length);
+    Assert.assertEquals(-4, arr[0]);
+    Assert.assertEquals(5, arr[1]);
+    Assert.assertEquals(66, arr[2]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java
new file mode 100644
index 0000000..ed7d904
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+
+public class TestPackedBuffer {
+  private static final Log LOG = LogFactory.getLog(TestPackedBuffer.class);
+
+  @Test(timeout = 60000)
+  public void testWriteReqFrame() throws Exception {
+    byte[] arr = new byte[PackedBuffer.HRPC_REQ_FRAME_LENGTH];
+    ByteBuffer bb = ByteBuffer.wrap(arr);
+    PackedBuffer buf = new PackedBuffer(bb);
+    PackedBuffer.writeReqFrame(bb, 1, 123, 456);
+    Assert.assertEquals(PackedBuffer.HRPC_REQ_FRAME_LENGTH, bb.position());
+    Assert.assertEquals("48 54 52 43 " +
+        "01 00 00 00 " +
+        "7b 00 00 00 00 00 00 00 " +
+        "c8 01 00 00",
+        buf.toHexString());
+  }
+
+  @Test(timeout = 60000)
+  public void testPackSpans() throws Exception {
+    Random rand = new Random(123);
+    byte[] arr = new byte[16384];
+    ByteBuffer bb = ByteBuffer.wrap(arr);
+    bb.limit(bb.capacity());
+    PackedBuffer buf = new PackedBuffer(bb);
+    final int NUM_TEST_SPANS = 5;
+    Span[] spans = new Span[NUM_TEST_SPANS];
+    for (int i = 0; i < NUM_TEST_SPANS; i++) {
+      spans[i] = TestUtil.randomSpan(rand);
+    }
+    for (int i = 0; i < NUM_TEST_SPANS; i++) {
+      buf.writeSpan(spans[i]);
+    }
+    LOG.info("wrote " + buf.toHexString());
+    MessagePack msgpack = new MessagePack(PackedBuffer.MSGPACK_CONF);
+    MessageUnpacker unpacker = msgpack.newUnpacker(arr, 0, bb.position());
+    Span[] respans = new Span[NUM_TEST_SPANS];
+    for (int i = 0; i < NUM_TEST_SPANS; i++) {
+      respans[i] = PackedBuffer.readSpan(unpacker);
+    }
+    for (int i = 0; i < NUM_TEST_SPANS; i++) {
+      Assert.assertEquals("Failed to read back span " + i,
+          spans[i].toJson(), respans[i].toJson());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java
new file mode 100644
index 0000000..630a02a
--- /dev/null
+++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimeUtil {
+  /**
+   * Test that our deltaMs function can compute the time difference between any
+   * two monotonic times in milliseconds.
+   */
+  @Test(timeout = 60000)
+  public void testDeltaMs() throws Exception {
+    Assert.assertEquals(0, TimeUtil.deltaMs(0, 0));
+    Assert.assertEquals(1, TimeUtil.deltaMs(0, 1));
+    Assert.assertEquals(0, TimeUtil.deltaMs(1, 0));
+    Assert.assertEquals(10, TimeUtil.deltaMs(1000, 1010));
+    long minMs = TimeUnit.MILLISECONDS.convert(Long.MIN_VALUE,
+        TimeUnit.NANOSECONDS);
+    long maxMs = TimeUnit.MILLISECONDS.convert(Long.MAX_VALUE,
+        TimeUnit.NANOSECONDS);
+    Assert.assertEquals(10, TimeUtil.deltaMs(minMs, minMs + 10));
+    Assert.assertEquals(maxMs, TimeUtil.deltaMs(minMs, maxMs));
+    Assert.assertEquals(11, TimeUtil.deltaMs(maxMs - 10, minMs));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java b/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java
deleted file mode 100644
index 74731fa..0000000
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
-/**
- * Small util for making a data directory for tests to use when running tests. We put it up at
- * target/test-data/UUID.  Create an instance of this class per unit test run and it will take
- * care of setting up the dirs for you.  Pass what is returned here as location from which to
- * have daemons and tests dump data.
- * TODO: Add close on exit.
- */
-public class DataDir {
-  private File baseTestDir = null;
-  private File testDir = null;
-
-  /**
-   * System property key to get base test directory value
-   */
-  public static final String TEST_BASE_DIRECTORY_KEY = "test.data.base.dir";
-
-  /**
-   * Default base directory for test output.
-   */
-  public static final String TEST_BASE_DIRECTORY_DEFAULT = "target";
-
-  public static final String TEST_BASE_DIRECTORY_NAME = "test-data";
-
-  /**
-   * @return Where to write test data on local filesystem; usually
-   * {@link #TEST_BASE_DIRECTORY_DEFAULT}
-   * Should not be used directly by the unit tests, hence its's private.
-   * Unit test will use a subdirectory of this directory.
-   * @see #setupDataTestDir()
-   */
-  private synchronized File getBaseTestDir() {
-    if (this.baseTestDir != null) return this.baseTestDir;
-    String testHome = System.getProperty(TEST_BASE_DIRECTORY_KEY, TEST_BASE_DIRECTORY_DEFAULT);
-    this.baseTestDir = new File(testHome, TEST_BASE_DIRECTORY_NAME);
-    return this.baseTestDir;
-  }
-
-  /**
-   * @return Absolute path to the dir created by this instance.
-   * @throws IOException 
-   */
-  public synchronized File getDataDir() throws IOException {
-    if (this.testDir != null) return this.testDir;
-    this.testDir = new File(getBaseTestDir(), UUID.randomUUID().toString());
-    if (!this.testDir.exists()) {
-      if (!this.testDir.mkdirs()) throw new IOException("Failed mkdirs for " + this.testDir);
-    }
-    // Return absolute path. A relative passed to htraced will have it create data dirs relative
-    // to its data dir rather than in it.
-    return this.testDir.getAbsoluteFile();
-  }
-
-  /**
-   * Fragile. Ugly. Presumes paths. Best we can do for now until htraced comes local to this module
-   * and is moved out of src dir.
-   * @param dataDir A datadir gotten from {@link #getDataDir()}
-   * @return Top-level of the checkout.
-   */
-  public static File getTopLevelOfCheckout(final File dataDir) {
-    // Need absolute else we run out of road when dir is relative to this module.
-    File absolute = dataDir.getAbsoluteFile();
-    // Check we are where we think we are.
-    File testDataDir = absolute.getParentFile();
-    if (!testDataDir.getName().equals(TEST_BASE_DIRECTORY_NAME)) {
-      throw new IllegalArgumentException(dataDir.toString());
-    }
-    // Do another check.
-    File targetDir = testDataDir.getParentFile();
-    if (!targetDir.getName().equals(TEST_BASE_DIRECTORY_DEFAULT)) {
-      throw new IllegalArgumentException(dataDir.toString());
-    }
-    // Back up last two dirs out of the htrace-htraced dir.
-    return targetDir.getParentFile().getParentFile();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
deleted file mode 100644
index 3e800d2..0000000
--- a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.util;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.ProcessBuilder.Redirect;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.URL;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * To get instance of HTraced up and running, create an instance of this class.
- * Upon successful construction, htraced is running using <code>dataDir</code> as directory to
- * host data (leveldbs and logs).
- * TODO: We expect to find the htraced in a very particular place. Fragile. Will break if stuff
- * moves.
- */
-public class HTracedProcess extends Process {
-  private static final Log LOG = LogFactory.getLog(HTracedProcess.class);
-  private final Process delegate;
-
-  private final String httpAddr;
-
-  /**
-   * Data send back from the HTraced process on the notification port.
-   */
-  @JsonIgnoreProperties(ignoreUnknown = true)
-  public static class StartupNotificationData {
-    /**
-     * The hostname:port pair which the HTraced process uses for HTTP requests.
-     */
-    @JsonProperty("HttpAddr")
-    String httpAddr;
-
-    /**
-     * The process ID of the HTraced process.
-     */
-    @JsonProperty("ProcessId")
-    long processId;
-  }
-
-  public HTracedProcess(final File binPath, final File dataDir,
-                        final String host) throws IOException {
-    // Create a notifier socket bound to a random port.
-    ServerSocket listener = new ServerSocket(0);
-    boolean success = false;
-    Process process = null;
-    try {
-      // Use a random port for the web address.  No 'scheme' yet.
-      String webAddress = host + ":0";
-      String logPath = new File(dataDir, "log.txt").getAbsolutePath();
-      // Pass cmdline args to htraced to it uses our test dir for data.
-      ProcessBuilder pb = new ProcessBuilder(binPath.toString(),
-        "-Dlog.level=TRACE",
-        "-Dlog.path=" + logPath,
-        "-Dweb.address=" + webAddress,
-        "-Ddata.store.clear=true",
-        "-Dstartup.notification.address=localhost:" + listener.getLocalPort(),
-        "-Ddata.store.directories=" + dataDir.toString());
-      pb.redirectErrorStream(true);
-      // Inherit STDERR/STDOUT i/o; dumps on console for now.  Can add logs later.
-      pb.inheritIO();
-      pb.directory(dataDir);
-      //assert pb.redirectInput() == Redirect.PIPE;
-      //assert pb.redirectOutput().file() == dataDir;
-      process = pb.start();
-      assert process.getInputStream().read() == -1;
-      StartupNotificationData data = readStartupNotification(listener);
-      httpAddr = data.httpAddr;
-      LOG.info("Started htraced process " + data.processId + " with http " +
-               "address " + data.httpAddr + ", logging to " + logPath);
-      success = true;
-    } finally {
-      if (!success) {
-        // Clean up after failure
-        if (process != null) {
-          process.destroy();
-          process = null;
-        }
-      }
-      delegate = process;
-      listener.close();
-    }
-  }
-
-  private static StartupNotificationData
-      readStartupNotification(ServerSocket listener) throws IOException {
-    Socket socket = listener.accept();
-    try {
-      InputStream in = socket.getInputStream();
-      ObjectMapper objectMapper = new ObjectMapper();
-      StartupNotificationData data = objectMapper.
-          readValue(in, StartupNotificationData.class);
-      return data;
-    } finally {
-      socket.close();
-    }
-  }
-
-  public int hashCode() {
-    return delegate.hashCode();
-  }
-
-  public OutputStream getOutputStream() {
-    throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT");
-  }
-
-  public InputStream getInputStream() {
-    throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT");
-  }
-
-  public boolean equals(Object obj) {
-    return delegate.equals(obj);
-  }
-
-  public InputStream getErrorStream() {
-    throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT");
-  }
-
-  public int waitFor() throws InterruptedException {
-    return delegate.waitFor();
-  }
-
-  public int exitValue() {
-    return delegate.exitValue();
-  }
-
-  public void destroy() {
-    delegate.destroy();
-  }
-
-  public String toString() {
-    return delegate.toString();
-  }
-
-  public String getHttpAddr() {
-    return httpAddr;
-  }
-
-  /**
-   * Ugly but how else to do file-math?
-   * @param topLevel Presumes top-level of the htrace checkout.
-   * @return Path to the htraced binary.
-   */
-  public static File getPathToHTraceBinaryFromTopLevel(final File topLevel) {
-    return new File(new File(new File(new File(topLevel, "htrace-htraced"), "go"), "build"),
-      "htraced");
-  }
-}



Mime
View raw message