Return-Path: Delivered-To: apmail-avro-commits-archive@www.apache.org Received: (qmail 60554 invoked from network); 5 Aug 2010 00:08:53 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 5 Aug 2010 00:08:53 -0000 Received: (qmail 59949 invoked by uid 500); 5 Aug 2010 00:08:53 -0000 Delivered-To: apmail-avro-commits-archive@avro.apache.org Received: (qmail 59930 invoked by uid 500); 5 Aug 2010 00:08:53 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 59922 invoked by uid 99); 5 Aug 2010 00:08:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Aug 2010 00:08:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Aug 2010 00:08:45 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DF8ED23889B2; Thu, 5 Aug 2010 00:07:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r982434 [1/2] - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/ipc/ lang/java/src/java/org/apache/avro/ipc/trace/ lang/java/src/test/java/org/apache/avro/ipc/trace/ share/schemas/org/apache/avro/ipc/trace/ Date: Thu, 05 Aug 2010 00:07:25 -0000 To: commits@avro.apache.org From: philz@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100805000725.DF8ED23889B2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: philz Date: Thu Aug 5 00:07:24 2010 New Revision: 982434 URL: http://svn.apache.org/viewvc?rev=982434&view=rev Log: AVRO-595. Add Basic Trace Collection and Propagation. (Patrick Wendell via philz) Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanTraceFormation.java avro/trunk/share/schemas/org/apache/avro/ipc/trace/ avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avdl avro/trunk/share/schemas/org/apache/avro/ipc/trace/avroTrace.avpr Modified: avro/trunk/CHANGES.txt avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java Modified: avro/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=982434&r1=982433&r2=982434&view=diff ============================================================================== --- avro/trunk/CHANGES.txt (original) +++ avro/trunk/CHANGES.txt Thu Aug 5 00:07:24 2010 @@ -13,6 +13,9 @@ Avro 1.4.0 (unreleased) NEW FEATURES + AVRO-595. Add Basic Trace Collection and Propagation. + (Patrick Wendell via philz) + AVRO-493. Add support for Hadoop Mapreduce with Avro data files. (cutting) AVRO-285: Specify one-way messages and implement in Java. (cutting) Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java?rev=982434&r1=982433&r2=982434&view=diff ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java (original) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java Thu Aug 5 00:07:24 2010 @@ -102,15 +102,15 @@ public abstract class Requestor { writeRequest(m.getRequest(), request, out); // write request payload List payload = bbo.getBufferList(); + writeHandshake(out); // prepend handshake if needed + META_WRITER.write(context.requestCallMeta(), out); + out.writeString(m.getName()); // write message name + context.setRequestPayload(payload); for (RPCPlugin plugin : rpcMetaPlugins) { plugin.clientSendRequest(context); // get meta-data from plugins } - writeHandshake(out); // prepend handshake if needed - META_WRITER.write(context.requestCallMeta(), out); - out.writeString(m.getName()); // write message name - bbo.append(payload); List requestBytes = bbo.getBufferList(); Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java (added) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/InMemorySpanStorage.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.ipc.trace; + +import java.util.LinkedList; +import java.util.List; + +/** + * Example implementation of SpanStorage which keeps spans in memory. + * + * This is designed as a prototype for demonstration and testing. It should + * not be used in production settings. + * + */ +public class InMemorySpanStorage implements SpanStorage { + private static final long DEFAULT_MAX_SPANS = 10000; + + protected LinkedList spans; + private long maxSpans; + + public InMemorySpanStorage() { + this.spans = new LinkedList(); + this.maxSpans = DEFAULT_MAX_SPANS; + } + + @Override + public void addSpan(Span s) { + synchronized (this.spans) { + this.spans.add(s); + if (this.spans.size() > this.maxSpans) { + this.spans.removeFirst(); + } + } + } + + @Override + public void setMaxSpans(long maxSpans) { + this.maxSpans = maxSpans; + + synchronized (this.spans) { + while (this.spans.size() > maxSpans) { + this.spans.removeFirst(); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public List getAllSpans() { + return (LinkedList) this.spans.clone(); + } +} Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java (added) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanAggregator.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.ipc.trace; + +import static org.apache.avro.ipc.trace.Util.idsEqual; +import static org.apache.avro.ipc.trace.Util.longValue; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; + +import edu.emory.mathcs.backport.java.util.Arrays; + +/** + * Utility methods for aggregating spans together at various + * points of trace analysis. + */ +public class SpanAggregator { + /** + * Class to store the results of span aggregation. + */ + public static class SpanAggregationResults { + /** Spans which have data from client and server. */ + public List completeSpans; + + /** Spans which have data only from client or server, or in which + * an ID collision was detected.*/ + public List incompleteSpans; + + public SpanAggregationResults() { + completeSpans = new LinkedList(); + incompleteSpans = new LinkedList(); + } + } + + /** + * Class to store the results of trace formation. + */ + public static class TraceFormationResults { + /** Traces which were successfully created. */ + public List traces; + + /** Spans which did not describe a complete trace. */ + public List rejectedSpans; + + public TraceFormationResults() { + traces = new LinkedList(); + rejectedSpans = new LinkedList(); + } + } + + /** + * Merge a list of incomplete spans (data filled in from only client or + * server) into complete spans (full client and server data). + */ + @SuppressWarnings("unchecked") + static SpanAggregationResults getFullSpans(List partials) { + SpanAggregationResults out = new SpanAggregationResults(); + HashMap seenSpans = new HashMap(); + List allEvents = (List) Arrays.asList( + SpanEvent.values()); + + for (Span s: partials) { + EnumSet foundEvents = Util.getAllEvents(s); + + // Span has complete data already + if (foundEvents.containsAll(allEvents)) { + out.completeSpans.add(s); + } + // We haven't seen other half yet + else if (!seenSpans.containsKey( + Util.longValue(s.spanID))) { + seenSpans.put(Util.longValue(s.spanID), s); + } + // We have seen other half + else { + Span other = seenSpans.remove(Util.longValue(s.spanID)); + if (!other.messageName.equals(s.messageName) || + !idsEqual(other.parentSpanID, s.parentSpanID)) { + out.incompleteSpans.add(s); + out.incompleteSpans.add(other); + } else { + foundEvents.addAll(Util.getAllEvents(other)); + if (other.requestorHostname != null) { + s.requestorHostname = other.requestorHostname; + } + if (other.responderHostname != null) { + s.responderHostname = other.responderHostname; + } + + // We have a complete span between the two + if (foundEvents.containsAll(allEvents)) { + for (TimestampedEvent event: other.events) { + s.events.add(event); + } + } + s.complete = true; + out.completeSpans.add(s); + } + } + } + + // Flush unmatched spans + for (Span s: seenSpans.values()) { + out.incompleteSpans.add(s); + } + return out; + } + + /** + * Given a list of Spans extract as many Trace objects as possible. + * A {@link Trace} is a tree-like data structure containing spans. + */ + static TraceFormationResults getTraces(List spans) { + /** Traces indexed by traceID. */ + HashMap> traces = new HashMap>(); + + for (Span s: spans) { + if (traces.get(longValue(s.traceID)) == null) { + traces.put(longValue(s.traceID), new ArrayList()); + } + traces.get(longValue(s.traceID)).add(s); + } + + TraceFormationResults out = new TraceFormationResults(); + + for (List spanSet : traces.values()) { + Trace trace = Trace.extractTrace(spanSet); + if (trace != null) { + out.traces.add(trace); + } + else { + out.rejectedSpans.addAll(spanSet); + } + } + return out; + } +} Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java (added) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/SpanStorage.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.ipc.trace; + +import java.util.List; + +/** + * Responsible for storing spans locally and answering span queries. + * + * Since query for a given set of spans may persist over several RPC + * calls, they are indexed by a handle. + * + */ +public interface SpanStorage { + /** + * Add a span. + * @param s + */ + void addSpan(Span s); + + /** + * Set the maximum number of spans to have in storage at any given time. + */ + void setMaxSpans(long maxSpans); + + /** + * Return a list of all spans currently stored. For testing. + */ + List getAllSpans(); +} Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java (added) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Trace.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.ipc.trace; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Collections; + +/** + * A Trace is a tree of spans which reflects the actual call structure of a + * recursive RPC call tree. Each node in a Trace represents a RPC + * request/response pair. Each node also has zero or more child nodes. + */ +public class Trace { + private TraceNode root; + + /** + * Construct a trace given a root TraceNode. + */ + public Trace(TraceNode root) { + this.root = root; + } + + /** + * Return the root node of this trace. + */ + public TraceNode getRoot() { + return this.root; + } + + /** + * Provide a hashCode unique to the execution path of this trace. + * + * This is useful for grouping several traces which represent the same + * execution path (for instance, when we want to calculate averages for a + * large number of identical traces). + */ + public int executionPathHash() { + // The string representation will be unique to a call tree, so we + // can borrow the hashCode from that string. + return this.printBrief().hashCode(); + } + + private class NodeComparator implements Comparator { + @Override + public int compare(TraceNode tn0, TraceNode tn1) { + // We sort nodes alphabetically by the message name + int result = tn0.span.messageName.compareTo(tn1.span.messageName); + + if (result != 0) { + return result; + } + /* NOTE: + * If two spans containing the *same* RPC message share a parent, we need + * a way to consistently order them. Here, we use the send time to + * break ties. This will only work deterministically for non-async + * clients. For asynchronous clients, aggregated statistics based on this + * ordering may be incorrect, since we have no way to disambiguate one + * function call from another. + */ + else { + Long tn0SendTime = tn0.extractEventTime(tn0, SpanEvent.CLIENT_SEND); + Long tn1SendTime = tn1.extractEventTime(tn1, SpanEvent.CLIENT_SEND); + + return tn0SendTime.compareTo(tn1SendTime); + } + } + } + + /** + * Print a brief description of this trace describing the execution + * path, but not timing data. This is for debugging or quickly profiling + * traces. + * + * For instance the trace: + * x + * / + * w + * \ + * y--z + * + * is encoded as: + * (w (x) (y (z))) + */ + public String printBrief() { + if (this.root == null) { return "Trace: "; } + String out = "Trace: ("; + out += this.root.span.messageName + " "; + out += printBriefRecurse(root.children); + out += ")"; + return out; + } + + private String printBriefRecurse(List children) { + String out = ""; + // We sort so equivalent traces always print identically + Collections.sort(children, new NodeComparator()); + for (int i = 0; i < children.size(); i++) { + TraceNode tn = children.get(i); + out += "(" + tn.span.messageName; + if (tn.children.size() > 0) { + out += " "; + out += printBriefRecurse(tn.children); + } + out += ")"; + if (i != children.size() - 1) { + out += " "; + } + } + return out; + } + + /** + * Print a description of this trace which includes timing data. This is for + * debugging or quickly profiling traces. + * + * For instance the trace: + * x + * / + * w + * \ + * x + * + * Might print as: + * w 87ms + * x 10ms + * x 2ms + */ + public String printWithTiming() { + if (this.root == null) { return "Trace: "; } + String out = "Trace: " + "\n"; + List rootList = new LinkedList(); + rootList.add(this.root); + out += printWithTimingRecurse(rootList, 0); + return out; + } + + private String printWithTimingRecurse(List children, int depth) { + String out = ""; + // We sort so equivalent traces always print identically + Collections.sort(children, new NodeComparator()); + for (TraceNode tn : children) { + long clientSend = 0; + long clientReceive = 0; + for (TimestampedEvent te: tn.span.events) { + if (te.event instanceof SpanEvent) { + SpanEvent ev = (SpanEvent) te.event; + if (ev.equals(SpanEvent.CLIENT_RECV)) { + clientReceive = te.timeStamp / 1000000; + } else if (ev.equals(SpanEvent.CLIENT_SEND)) { + clientSend = te.timeStamp / 1000000; + } + } + } + + for (int i = 0; i < depth; i++) { out = out + " "; } // indent + out += tn.span.messageName + " " + (clientReceive - clientSend) + "ms\n"; + if (tn.children.size() > 0) { + out += printWithTimingRecurse(tn.children, depth + 1); + } + } + + return out; + } + + /** + * Construct a Trace from a list of Span objects. If no such trace + * can be created (if the list does not describe a complete trace) + * returns null. + */ + public static Trace extractTrace(List spans) { + /** + * Map of span id's to a list of child span id's + */ + HashMap> children = new HashMap>(); + + /** + * Map of span id's to spans + */ + HashMap spanRef = new HashMap(); + + /** + * Root span + */ + Span rootSpan = null; + + for (Span s: spans) { + spanRef.put(Util.longValue(s.spanID), s); + if (s.parentSpanID == null) { + rootSpan = s; + } else { + if (children.get(Util.longValue(s.parentSpanID)) == null) { + LinkedList list = new LinkedList(); + list.add(Util.longValue(s.spanID)); + children.put(Util.longValue(s.parentSpanID), list); + } else { + children.get(Util.longValue(s.parentSpanID)).add( + Util.longValue(s.spanID)); + } + } + } + if (rootSpan == null) { // We never found a root + return null; + } + TraceNode rootNode = getNode(rootSpan, spanRef, children); + return new Trace(rootNode); + } + + /** + * Recursive helper method to create a span tree. + */ + private static TraceNode getNode( + Span s, HashMap spanRef, HashMap> children) { + TraceNode out = new TraceNode(); + out.span = s; + out.children = new LinkedList(); + + List kids = children.get(Util.longValue(s.spanID)); + if (kids == null) { return out; } // no children (base case) + + for (long childID: kids) { + Span childSpan = spanRef.get(childID); + if (childSpan == null) { return null; } // invalid span reference + out.children.add(getNode(childSpan, spanRef, children)); + } + return out; + } +} Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java (added) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceClientServlet.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.ipc.trace; + +import java.io.IOException; +import java.io.PrintWriter; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class TraceClientServlet extends HttpServlet { + public void doPost(HttpServletRequest request, + HttpServletResponse response) + throws ServletException, IOException { + response.setContentType("text/html"); + PrintWriter out = response.getWriter(); + + out.println("Example "); + out.println("

Button Clicked

"); + + String servers = request.getParameter("servers"); + + if(servers != null){ + String splitToken = System.getProperty("line.separator"); + if (splitToken == null) { + splitToken = "\n"; + } + String[] parts = servers.split(splitToken); + for (String p : parts) { + out.println(p + "
"); + } + } else { + out.println("No text entered."); + } + } + + public void doGet(HttpServletRequest request, + HttpServletResponse response) throws IOException { + response.setContentType("text/html"); + PrintWriter out = response.getWriter(); + + out.println(""); + out.println("Form"); + out.println("
"); + out.println(""); + out.println(""); + out.println("
"); + out.println(""); + } +} Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java (added) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TraceNode.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.ipc.trace; + +import java.util.List; + +/** + * A node of of an RPC {@link Trace}. Each node stores a {@link Span} object + * and a list of zero or more child nodes. + */ +class TraceNode { + /** + * The {@link Span} to which corresponds to this node in the call tree. + */ + public Span span; + + /** + * A list of this TraceNode's children. + */ + public List children; + + public TraceNode(Span span, List children) { + this.span = span; + this.children = children; + } + + public TraceNode() { + + } + + /** + * Return the time stamp associated with a particular SpanEvent in this + * Trace Node. Return -1 if the TraceNode's Span did not contain that event. + */ + public long extractEventTime(TraceNode tn, SpanEvent e) { + for (TimestampedEvent te: tn.span.events) { + if ((te.event instanceof SpanEvent) && + (SpanEvent) te.event == e) { + return te.timeStamp; + } + } + return -1; + } + + /** + * Return time delta between { @link SpanEvent.CLIENT_SEND } and + * { @link SpanEvent.SERVER_RECV }. This may be negative or zero in the + * case of clock skew. + */ + public long getPreNetworkTime() { + long clientSend = extractEventTime(this, SpanEvent.CLIENT_SEND); + long serverReceive = extractEventTime(this, SpanEvent.SERVER_RECV); + + return serverReceive - clientSend; + } + + /** + * Return time delta between { @link SpanEvent.SERVER_SEND } and + * { @link SpanEvent.CLIENT_RECV }. This may be negative or zero in the + * case of clock skew. + */ + public long getPostNetworkTime() { + long serverSend = extractEventTime(this, SpanEvent.SERVER_SEND); + long clientReceive = extractEventTime(this, SpanEvent.CLIENT_RECV); + + return clientReceive - serverSend; + } + + /** + * Return time delta between { @link SpanEvent.SERVER_RECV } and + * { @link SpanEvent.SERVER_SEND}. + */ + public long getProcessTime() { + long serverReceive = extractEventTime(this, SpanEvent.SERVER_RECV); + long serverSend = extractEventTime(this, SpanEvent.SERVER_SEND); + + return serverSend - serverReceive; + } +} Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java (added) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePlugin.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.ipc.trace; + +import java.io.IOException; +import java.net.BindException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericData; +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.ipc.HttpServer; +import org.apache.avro.ipc.RPCContext; +import org.apache.avro.ipc.RPCPlugin; +import org.apache.avro.specific.SpecificResponder; +import org.apache.avro.util.Utf8; +import org.mortbay.jetty.Server; +import org.mortbay.jetty.bio.SocketConnector; +import org.mortbay.jetty.servlet.Context; +import org.mortbay.jetty.servlet.ServletHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A tracing plugin for Avro. + * + * This plugin traces RPC call timing and follows nested trees of RPC + * calls. To use, instantiate and add to an existing Requestor or + * Responder. If you have a Responder that itself acts as an RPC client (i.e. + * it contains a Requestor) be sure to pass the same instance of the + * plugin to both components so that propagation occurs correctly. + * + * Currently, propagation only works if each requester is performing + * serial RPC calls. That is, we cannot support the case in which the + * Requestor's request does not occur in the same thread as the Responder's + * response. + */ +public class TracePlugin extends RPCPlugin { + final private static Random RANDOM = new Random(); + private static final Logger LOG = LoggerFactory.getLogger(TracePlugin.class); + public static enum StorageType { MEMORY, DISK }; + + /* + * This plugin uses three key/value meta-data pairs. The value type for all + * of these pairs is fixed(8) and they are assumed to be encoded as 8-byte + * ID's. The presence of a TRACE_ID_KEY and a SPAN_ID_KEY in a message + * signals that tracing is in progress. The optional PARENT_SPAN_ID_KEY + * signals that this message has a parent node in the RPC call tree. + */ + private static final Utf8 TRACE_ID_KEY = new Utf8("traceID"); + private static final Utf8 SPAN_ID_KEY = new Utf8("spanID"); + private static final Utf8 PARENT_SPAN_ID_KEY = new Utf8("parentSpanID"); + + + class TraceResponder implements AvroTrace { + private SpanStorage spanStorage; + + public TraceResponder(SpanStorage spanStorage) { + this.spanStorage = spanStorage; + } + + @Override + public GenericArray getAllSpans() throws AvroRemoteException { + List spans = this.spanStorage.getAllSpans(); + GenericData.Array out; + synchronized (spans) { + out = new GenericData.Array(spans.size(), + Schema.createArray(Span.SCHEMA$)); + for (Span s: spans) { + out.add(s); + } + } + return out; + } + } + + private double traceProb; // Probability of starting tracing + private int port; // Port to serve tracing data + private int clientPort; // Port to expose client HTTP interface + private StorageType storageType; // How to store spans + private long maxSpans; // Max number of spans to store + private boolean enabled; // Whether to participate in tracing + + private ThreadLocal currentSpan; // span in which we are server + private ThreadLocal childSpan; // span in which we are client + + // Storage and serving of spans + protected SpanStorage storage; + protected HttpServer httpServer; + protected SpecificResponder responder; + + // Client interface + protected Server clientFacingServer; + + public TracePlugin(TracePluginConfiguration conf) throws IOException { + traceProb = conf.traceProb; + port = conf.port; + clientPort = conf.clientPort; + storageType = conf.storageType; + maxSpans = conf.maxSpans; + enabled = conf.enabled; + + // check bounds + if (!(traceProb >= 0.0 && traceProb <= 1.0)) { traceProb = 0.0; } + if (!(port > 0 && port < 65535)) { port = 51001; } + if (!(clientPort > 0 && clientPort < 65535)) { clientPort = 51200; } + if (maxSpans < 0) { maxSpans = 5000; } + + currentSpan = new ThreadLocal(){ + @Override protected Span initialValue(){ + return null; + } + }; + + childSpan = new ThreadLocal(){ + @Override protected Span initialValue(){ + return null; + } + }; + + if (storageType.equals("MEMORY")) { + this.storage = new InMemorySpanStorage(); + } + else { // default + this.storage = new InMemorySpanStorage(); + } + + this.storage.setMaxSpans(maxSpans); + + // Start serving span data + responder = new SpecificResponder( + AvroTrace.PROTOCOL, new TraceResponder(this.storage)); + + boolean bound = false; + + while (!bound) { + // rather than die if port is taken, try to fail over to another port. + try { + httpServer = new HttpServer(responder, this.port); + bound = true; + } catch (AvroRuntimeException e) { + if (e.getCause() instanceof BindException) { + LOG.error("Failed to bind to port: " + this.port); + this.port = this.port +1; + } else { + throw e; + } + } + } + + // Start client-facing servlet + initializeClientServer(); + } + + @Override + public void clientStartConnect(RPCContext context) { + // There are two cases in which we will need to seed a trace + // (1) If we probabilistically decide to seed a new trace + // (2) If we are part of an existing trace + + if ((this.currentSpan.get() == null) && + (RANDOM.nextFloat() < this.traceProb) && enabled) { + // Start new trace + Span span = Util.createEventlessSpan(null, null, null); + this.childSpan.set(span); + } + + if ((this.currentSpan.get() != null) && enabled) { + Span currSpan = this.currentSpan.get(); + Span span = Util.createEventlessSpan( + currSpan.traceID, null, currSpan.spanID); + this.childSpan.set(span); + } + + if (this.childSpan.get() != null) { + Span span = this.childSpan.get(); + context.requestHandshakeMeta().put( + TRACE_ID_KEY, ByteBuffer.wrap(span.traceID.bytes())); + context.requestHandshakeMeta().put( + SPAN_ID_KEY, ByteBuffer.wrap(span.spanID.bytes())); + if (span.parentSpanID != null) { + context.requestHandshakeMeta().put( + PARENT_SPAN_ID_KEY, ByteBuffer.wrap(span.parentSpanID.bytes())); + } + } + } + + @Override + public void serverConnecting(RPCContext context) { + Map meta = context.requestHandshakeMeta(); + // Are we being asked to propagate a trace? + if (meta.containsKey(TRACE_ID_KEY) && enabled) { + if (!(meta.containsKey(SPAN_ID_KEY))) { + LOG.warn("Span ID missing for trace " + + meta.get(TRACE_ID_KEY).toString()); + return; // should have been given full span data + } + byte[] spanIDBytes = new byte[8]; + meta.get(SPAN_ID_KEY).get(spanIDBytes); + ID spanID = new ID(); + spanID.bytes(spanIDBytes); + + ID parentSpanID = null; + if (meta.get(PARENT_SPAN_ID_KEY) != null) { + parentSpanID = new ID(); + parentSpanID.bytes(meta.get(PARENT_SPAN_ID_KEY).array()); + } + ID traceID = new ID(); + traceID.bytes(meta.get(TRACE_ID_KEY).array()); + + Span span = Util.createEventlessSpan(traceID, spanID, parentSpanID); + + span.events = new GenericData.Array( + 100, Schema.createArray(TimestampedEvent.SCHEMA$)); + this.currentSpan.set(span); + } + } + + @Override + public void clientFinishConnect(RPCContext context) { } + + @Override + public void clientSendRequest(RPCContext context) { + if (this.childSpan.get() != null) { + Span child = this.childSpan.get(); + Util.addEvent(child, SpanEvent.CLIENT_SEND); + child.messageName = new Utf8( + context.getMessage().getName()); + child.requestPayloadSize = Util.getPayloadSize( + context.getRequestPayload()); + } + } + + @Override + public void serverReceiveRequest(RPCContext context) { + if (this.currentSpan.get() != null) { + Span current = this.currentSpan.get(); + Util.addEvent(current, SpanEvent.SERVER_RECV); + current.messageName = new Utf8( + context.getMessage().getName()); + current.requestPayloadSize = Util.getPayloadSize( + context.getRequestPayload()); + } + } + + @Override + public void serverSendResponse(RPCContext context) { + if (this.currentSpan.get() != null) { + Span current = this.currentSpan.get(); + Util.addEvent(current, SpanEvent.SERVER_SEND); + current.responsePayloadSize = + Util.getPayloadSize(context.getResponsePayload()); + this.storage.addSpan(this.currentSpan.get()); + this.currentSpan.set(null); + } + } + + @Override + public void clientReceiveResponse(RPCContext context) { + if (this.childSpan.get() != null) { + Span child = this.childSpan.get(); + Util.addEvent(child, SpanEvent.CLIENT_RECV); + child.responsePayloadSize = + Util.getPayloadSize(context.getResponsePayload()); + this.storage.addSpan(this.childSpan.get()); + this.childSpan.set(null); + } + } + + /** + * Start a client-facing server. Can be overridden if users + * prefer to attach client Servlet to their own server. + */ + protected void initializeClientServer() { + clientFacingServer = new Server(); + Context context = new Context(clientFacingServer, "/"); + context.addServlet(new ServletHolder(new TraceClientServlet()), "/"); + boolean connected = false; + SocketConnector socket = null; + + // Keep trying ports until we can connect + while (!connected) { + try { + socket = new SocketConnector(); + socket.setPort(clientPort); + clientFacingServer.addConnector(socket); + clientFacingServer.start(); + connected = true; + } catch (Exception e) { + if (e instanceof BindException) { + clientFacingServer.removeConnector(socket); + clientPort = clientPort + 1; + continue; + } + else { + break; // Fail silently here (this is currently unused) + } + } + } + } +} Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java (added) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/TracePluginConfiguration.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.ipc.trace; + +import org.apache.avro.ipc.trace.TracePlugin.StorageType; + +/** + * Helper class for configuring Avro's {@link TracePlugin}. If you are using + * a common configuration module, wrap this class with your own configuration. + */ +public class TracePluginConfiguration { + public double traceProb; // Probability of starting tracing + public int port; // Port to serve tracing data + public int clientPort; // Port to expose client HTTP interface + public StorageType storageType; // How to store spans + public long maxSpans; // Max number of spans to store + public boolean enabled; // Whether or not we are active + + /** + * Return a TracePluginConfiguration with default options. + */ + public TracePluginConfiguration() { + this.traceProb = 0.0; + this.port = 12335; + this.clientPort = 12345; + this.storageType = StorageType.MEMORY; + this.maxSpans = 10000; + this.enabled = true; + } +} Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java (added) +++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/trace/Util.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.ipc.trace; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.LongBuffer; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Random; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; + +/** + * Utility methods for common tasks in Avro tracing. Mostly consists of + * static methods which we can't put in auto-generated classes. + */ +class Util { + final private static Random RANDOM = new Random(); + final private static int NANOS_PER_MILI = 1000000; + private static Utf8 hostname; + + /** + * Get all SpanEvents contained in Span s. + */ + public static EnumSet getAllEvents(Span s) { + EnumSet foundEvents = EnumSet.noneOf(SpanEvent.class); + for (TimestampedEvent event: s.events) { + if (event.event instanceof SpanEvent) { + foundEvents.add((SpanEvent) event.event); + } + } + return foundEvents; + } + + + /** + * Get the size of an RPC payload. + */ + public static int getPayloadSize(List payload) { + if (payload == null) { + return 0; + } + int size = 0; + for (ByteBuffer bb: payload) { + size = size + bb.limit(); + } + return size; + } + + /** + * Create a span without any events. If traceID or spanID is null, randomly + * generate them. If parentSpanID is null, assume this is a root span. + */ + public static Span createEventlessSpan(ID traceID, ID spanID, ID parentSpanID) { + Span span = new Span(); + span.complete = false; + + if (traceID == null) { + byte[] traceIDBytes = new byte[8]; + RANDOM.nextBytes(traceIDBytes); + span.traceID = new ID(); + span.traceID.bytes(traceIDBytes); + } else { + span.traceID = new ID(); + span.traceID.bytes(traceID.bytes().clone()); + } + + if (spanID == null) { + byte[] spanIDBytes = new byte[8]; + RANDOM.nextBytes(spanIDBytes); + span.spanID = new ID(); + span.spanID.bytes(spanIDBytes); + } else { + span.spanID = new ID(); + span.spanID.bytes(spanID.bytes().clone()); + } + + if (parentSpanID != null) { + span.parentSpanID = new ID(); + span.parentSpanID.bytes(parentSpanID.bytes().clone()); + } + + span.events = new GenericData.Array( + 10, Schema.createArray(TimestampedEvent.SCHEMA$)); + + if (hostname == null) { + try { + hostname = new Utf8(InetAddress.getLocalHost().toString()); + } catch (UnknownHostException e) { + hostname = new Utf8("Unknown"); + } + } + span.requestorHostname = hostname; + return span; + } + + /** + * Add a TimestampedEvent to a Span using the current time. + */ + public static void addEvent(Span span, SpanEvent eventType) { + TimestampedEvent ev = new TimestampedEvent(); + ev.event = eventType; + ev.timeStamp = System.currentTimeMillis() * NANOS_PER_MILI; + span.events.add(ev); + } + + /** + * Get the long value from a given ID object. + */ + public static long longValue(ID in) { + if (in == null) { + throw new IllegalArgumentException("ID cannot be null"); + } + if (in.bytes() == null) { + throw new IllegalArgumentException("ID cannot be empty"); + } + if (in.bytes().length != 8) { + throw new IllegalArgumentException("ID must be 8 bytes"); + } + ByteBuffer buff = ByteBuffer.wrap(in.bytes()); + return buff.getLong(); + } + + /** + * Get an ID associated with a given long value. + */ + public static ID idValue(long in) { + byte[] bArray = new byte[8]; + ByteBuffer bBuffer = ByteBuffer.wrap(bArray); + LongBuffer lBuffer = bBuffer.asLongBuffer(); + lBuffer.put(0, in); + ID out = new ID(); + out.bytes(bArray); + return out; + } + + /** + * Verify the equality of ID objects. Both being null references is + * considered equal. + */ + public static boolean idsEqual(ID a, ID b) { + if (a == null && b == null) { return true; } + if (a == null || b == null) { return false; } + + return Arrays.equals(a.bytes(), b.bytes()); + } + +} Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java (added) +++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestBasicTracing.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.ipc.trace; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Random; + +import org.apache.avro.Protocol; +import org.apache.avro.Protocol.Message; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRequestor; +import org.apache.avro.generic.GenericResponder; +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.ipc.HttpServer; +import org.apache.avro.ipc.HttpTransceiver; +import org.apache.avro.ipc.RPCPlugin; +import org.apache.avro.ipc.Responder; +import org.junit.Test; +import org.mortbay.log.Log; + +public class TestBasicTracing { + Protocol protocol = Protocol.parse("" + "{\"protocol\": \"Minimal\", " + + "\"messages\": { \"m\": {" + + " \"request\": [{\"name\": \"x\", \"type\": \"int\"}], " + + " \"response\": \"int\"} } }"); + Message message = protocol.getMessages().get("m"); + + /** Expects 0 and returns 1. */ + static class TestResponder extends GenericResponder { + public TestResponder(Protocol local) { + super(local); + } + + @Override + public Object respond(Message message, Object request) + throws AvroRemoteException { + assertEquals(0, ((GenericRecord) request).get("x")); + return 1; + } + } + + @Test + public void testBasicTrace() throws Exception { + TracePluginConfiguration conf = new TracePluginConfiguration(); + conf.port = 51007; + conf.clientPort = 12344; + conf.traceProb = 1.0; + TracePlugin responderPlugin = new TracePlugin(conf); + conf.port = 51008; + conf.clientPort = 12345; + TracePlugin requestorPlugin = new TracePlugin(conf); + + Responder res = new TestResponder(protocol); + res.addRPCPlugin(responderPlugin); + + HttpServer server = new HttpServer(res, 50000); + server.start(); + + HttpTransceiver trans = new HttpTransceiver( + new URL("http://localhost:50000")); + + GenericRequestor r = new GenericRequestor(protocol, trans); + r.addRPCPlugin(requestorPlugin); + + GenericRecord params = new GenericData.Record(protocol.getMessages().get( + "m").getRequest()); + params.put("x", 0); + r.request("m", params); + + List responderSpans = responderPlugin.storage.getAllSpans(); + assertEquals(1, responderSpans.size()); + + List requestorSpans = requestorPlugin.storage.getAllSpans(); + assertEquals(1, requestorSpans.size()); + + if ((responderSpans.size() == 1 && requestorSpans.size() == 1)) { + Span responderSpan = responderSpans.get(0); + Span requestorSpan = requestorSpans.get(0); + + // Check meta propagation + assertEquals(null, requestorSpan.parentSpanID); + assertEquals(responderSpan.parentSpanID, requestorSpan.parentSpanID); + assertEquals(responderSpan.traceID, requestorSpan.traceID); + + // Check other data + assertEquals(2, requestorSpan.events.size()); + assertEquals(2, responderSpan.events.size()); + assertTrue("m".equals(requestorSpan.messageName.toString())); + assertTrue("m".equals(responderSpan.messageName.toString())); + assertFalse(requestorSpan.complete); + assertFalse(responderSpan.complete); + } + + server.close(); + + requestorPlugin.clientFacingServer.stop(); + requestorPlugin.httpServer.close(); + + responderPlugin.clientFacingServer.stop(); + responderPlugin.httpServer.close(); + } + + /* + * Test a more complicated, recursive trace involving four agents and three + * spans. + * + * Messages are x, y, z which request/return + * incrementing int values (shown below). + * + * |-w-(1)-> | | + * | |-x-(2)-> | C + * | | <-x-(3)-| + * | | + * A | B | + * | | | + * | |-x-(4)-> | + * | | <-x-(5)-| D + * | | | + * |<-w-(6)- | | + * + * Listening ports are B: 21005 + * C: 21006 + * D: 21007 + */ + + Protocol advancedProtocol = Protocol.parse("{\"protocol\": \"Advanced\", " + + "\"messages\": { " + + "\"w\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], " + + " \"response\": \"int\"}," + + "\"x\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], " + + " \"response\": \"int\"}," + + "\"y\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], " + + " \"response\": \"int\"}" + + " } }"); + + /** Middle Responder */ + static class RecursingResponder extends GenericResponder { + HttpTransceiver transC; + HttpTransceiver transD; + GenericRequestor reqC; + GenericRequestor reqD; + Protocol protocol; + + public RecursingResponder(Protocol local, RPCPlugin plugin) + throws Exception { + super(local); + transC = new HttpTransceiver( + new URL("http://localhost:21006")); + transD = new HttpTransceiver( + new URL("http://localhost:21007")); + reqC = new GenericRequestor(local, transC); + reqC.addRPCPlugin(plugin); + reqD = new GenericRequestor(local, transD); + reqD.addRPCPlugin(plugin); + + protocol = local; + } + + @Override + public Object respond(Message message, Object request) + throws IOException { + assertTrue("w".equals(message.getName())); + GenericRecord inParams = (GenericRecord)request; + Integer currentCount = (Integer) inParams.get("req"); + assertTrue(currentCount.equals(1)); + + GenericRecord paramsC = new GenericData.Record( + protocol.getMessages().get("x").getRequest()); + paramsC.put("req", currentCount + 1); + Integer returnC = (Integer) reqC.request("x", paramsC); + assertTrue(returnC.equals(currentCount + 2)); + + GenericRecord paramsD = new GenericData.Record( + protocol.getMessages().get("x").getRequest()); + paramsD.put("req", currentCount + 3); + Integer returnD = (Integer) reqD.request("x", paramsD); + assertTrue(returnD.equals(currentCount + 4)); + + return currentCount + 5; + } + } + + /** Endpoint responder */ + static class EndpointResponder extends GenericResponder { + public EndpointResponder(Protocol local) { + super(local); + } + + @Override + public Object respond(Message message, Object request) + throws AvroRemoteException { + GenericRecord inParams = (GenericRecord)request; + Integer currentCount = (Integer) inParams.get("req"); + + return currentCount + 1; + } + } + + @Test + public void testRecursingTrace() throws Exception { + TracePluginConfiguration conf = new TracePluginConfiguration(); + conf.traceProb = 1.0; + conf.port = 51010; + conf.clientPort = 12346; + TracePlugin aPlugin = new TracePlugin(conf); + conf.port = 51011; + conf.clientPort = 12347; + TracePlugin bPlugin = new TracePlugin(conf); + conf.port = 51012; + conf.clientPort = 12348; + TracePlugin cPlugin = new TracePlugin(conf); + conf.port = 51013; + conf.clientPort = 12349; + TracePlugin dPlugin = new TracePlugin(conf); + + // Responders + Responder bRes = new RecursingResponder(advancedProtocol, bPlugin); + bRes.addRPCPlugin(bPlugin); + HttpServer server1 = new HttpServer(bRes, 21005); + server1.start(); + + Responder cRes = new EndpointResponder(advancedProtocol); + cRes.addRPCPlugin(cPlugin); + HttpServer server2 = new HttpServer(cRes, 21006); + server2.start(); + + Responder dRes = new EndpointResponder(advancedProtocol); + dRes.addRPCPlugin(dPlugin); + HttpServer server3 = new HttpServer(dRes, 21007); + server3.start(); + + // Root requestor + HttpTransceiver trans = new HttpTransceiver( + new URL("http://localhost:21005")); + + GenericRequestor r = new GenericRequestor(advancedProtocol, trans); + r.addRPCPlugin(aPlugin); + + GenericRecord params = new GenericData.Record( + advancedProtocol.getMessages().get("w").getRequest()); + params.put("req", 1); + r.request("w", params); + + // Verify counts + assertEquals(1, aPlugin.storage.getAllSpans().size()); + assertEquals(3, bPlugin.storage.getAllSpans().size()); + assertEquals(1, cPlugin.storage.getAllSpans().size()); + assertEquals(1, dPlugin.storage.getAllSpans().size()); + + ID traceID = aPlugin.storage.getAllSpans().get(0).traceID; + ID rootSpanID = null; + + // Verify event counts and trace ID propagation + for (Span s: aPlugin.storage.getAllSpans()) { + assertEquals(2, s.events.size()); + assertTrue(Util.idsEqual(traceID, s.traceID)); + assertFalse(s.complete); + rootSpanID = s.spanID; + } + + for (Span s: bPlugin.storage.getAllSpans()) { + assertEquals(2, s.events.size()); + assertEquals(traceID, s.traceID); + assertFalse(s.complete); + } + + for (Span s: cPlugin.storage.getAllSpans()) { + assertEquals(2, s.events.size()); + assertEquals(traceID, s.traceID); + assertFalse(s.complete); + } + for (Span s: dPlugin.storage.getAllSpans()) { + assertEquals(2, s.events.size()); + assertEquals(traceID, s.traceID); + assertFalse(s.complete); + } + + // Verify span propagation. + ID firstSpanID = aPlugin.storage.getAllSpans().get(0).spanID; + ID secondSpanID = cPlugin.storage.getAllSpans().get(0).spanID; + ID thirdSpanID = dPlugin.storage.getAllSpans().get(0).spanID; + + boolean firstFound = false, secondFound = false, thirdFound = false; + for (Span s: bPlugin.storage.getAllSpans()) { + if (Util.idsEqual(s.spanID, firstSpanID)) { + firstFound = true; + } + else if (Util.idsEqual(s.spanID, secondSpanID)) { + secondFound = true; + } + else if (Util.idsEqual(s.spanID, thirdSpanID)) { + thirdFound = true; + } + } + assertTrue(firstFound); + assertTrue(secondFound); + assertTrue(thirdFound); + + server1.close(); + server2.close(); + server3.close(); + aPlugin.httpServer.close(); + aPlugin.clientFacingServer.stop(); + bPlugin.httpServer.close(); + bPlugin.clientFacingServer.stop(); + cPlugin.httpServer.close(); + cPlugin.clientFacingServer.stop(); + dPlugin.httpServer.close(); + dPlugin.clientFacingServer.stop(); + + } + + /** Sleeps as requested. */ + private static class SleepyResponder extends GenericResponder { + public SleepyResponder(Protocol local) { + super(local); + } + + @Override + public Object respond(Message message, Object request) + throws AvroRemoteException { + try { + Thread.sleep((Long)((GenericRecord)request).get("millis")); + } catch (InterruptedException e) { + throw new AvroRemoteException(e); + } + return null; + } + } + + /** + * Demo program for using RPC trace. This automatically generates + * client RPC requests. + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + if (args.length == 0) { + args = new String[] { "7002", "7003" }; + } + Protocol protocol = Protocol.parse("{\"protocol\": \"sleepy\", " + + "\"messages\": { \"sleep\": {" + + " \"request\": [{\"name\": \"millis\", \"type\": \"long\"}," + + "{\"name\": \"data\", \"type\": \"bytes\"}], " + + " \"response\": \"null\"} } }"); + Log.info("Using protocol: " + protocol.toString()); + Responder r = new SleepyResponder(protocol); + TracePlugin p = new TracePlugin(new TracePluginConfiguration()); + r.addRPCPlugin(p); + + // Start Avro server + new HttpServer(r, Integer.parseInt(args[0])); + + HttpTransceiver trans = new HttpTransceiver( + new URL("http://localhost:" + Integer.parseInt(args[0]))); + GenericRequestor req = new GenericRequestor(protocol, trans); + TracePluginConfiguration clientConf = new TracePluginConfiguration(); + clientConf.clientPort = 12346; + clientConf.port = 12336; + clientConf.traceProb = 1.0; + req.addRPCPlugin(new TracePlugin(clientConf)); + + while(true) { + Thread.sleep(1000); + GenericRecord params = new GenericData.Record(protocol.getMessages().get( + "sleep").getRequest()); + Random rand = new Random(); + params.put("millis", Math.abs(rand.nextLong()) % 1000); + int payloadSize = Math.abs(rand.nextInt()) % 10000; + byte[] payload = new byte[payloadSize]; + rand.nextBytes(payload); + params.put("data", ByteBuffer.wrap(payload)); + req.request("sleep", params); + } + } +} Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java (added) +++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestEndToEndTracing.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.ipc.trace; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; + +import org.apache.avro.Protocol; +import org.apache.avro.Protocol.Message; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRequestor; +import org.apache.avro.generic.GenericResponder; +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.ipc.HttpServer; +import org.apache.avro.ipc.HttpTransceiver; +import org.apache.avro.ipc.RPCPlugin; +import org.apache.avro.ipc.Responder; +import org.apache.avro.ipc.trace.SpanAggregator.SpanAggregationResults; +import org.apache.avro.ipc.trace.SpanAggregator.TraceFormationResults; +import org.junit.Test; + +/** + * Tests which test logging, aggregation, and analysis of traces. + */ +public class TestEndToEndTracing { + + + /* + * Messages are w, x, y which request/return + * incrementing int values (shown below). + * + * |-w-(1)-> | | + * | |-x-(2)-> | C + * | | <-x-(3)-| + * | | + * A | B | + * | | | + * | |-x-(4)-> | + * | | <-x-(5)-| D + * | | | + * |<-w-(6)- | | + * + * Listening ports are B: 21005 + * C: 21006 + * D: 21007 + */ + + Protocol advancedProtocol = Protocol.parse("{\"protocol\": \"Advanced\", " + + "\"messages\": { " + + "\"w\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], " + + " \"response\": \"int\"}," + + "\"x\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], " + + " \"response\": \"int\"}," + + "\"y\": { \"request\": [{\"name\": \"req\", \"type\": \"int\"}], " + + " \"response\": \"int\"}" + + " } }"); + + /** Middle Responder */ + static class RecursingResponder extends GenericResponder { + HttpTransceiver transC; + HttpTransceiver transD; + GenericRequestor reqC; + GenericRequestor reqD; + Protocol protocol; + + public RecursingResponder(Protocol local, RPCPlugin plugin) + throws Exception { + super(local); + transC = new HttpTransceiver( + new URL("http://localhost:21006")); + transD = new HttpTransceiver( + new URL("http://localhost:21007")); + reqC = new GenericRequestor(local, transC); + reqC.addRPCPlugin(plugin); + reqD = new GenericRequestor(local, transD); + reqD.addRPCPlugin(plugin); + + protocol = local; + } + + @Override + public Object respond(Message message, Object request) + throws IOException { + assertTrue("w".equals(message.getName())); + GenericRecord inParams = (GenericRecord)request; + Integer currentCount = (Integer) inParams.get("req"); + assertTrue(currentCount.equals(1)); + + GenericRecord paramsC = new GenericData.Record( + protocol.getMessages().get("x").getRequest()); + paramsC.put("req", currentCount + 1); + Integer returnC = (Integer) reqC.request("x", paramsC); + assertTrue(returnC.equals(currentCount + 2)); + + GenericRecord paramsD = new GenericData.Record( + protocol.getMessages().get("x").getRequest()); + paramsD.put("req", currentCount + 3); + Integer returnD = (Integer) reqD.request("x", paramsD); + assertTrue(returnD.equals(currentCount + 4)); + + return currentCount + 5; + } + } + + /** Endpoint responder */ + static class EndpointResponder extends GenericResponder { + public EndpointResponder(Protocol local) { + super(local); + } + + @Override + public Object respond(Message message, Object request) + throws AvroRemoteException { + GenericRecord inParams = (GenericRecord)request; + Integer currentCount = (Integer) inParams.get("req"); + + return currentCount + 1; + } + } + + @Test + public void testTraceAndCollection() throws Exception { + TracePluginConfiguration conf = new TracePluginConfiguration(); + conf.traceProb = 1.0; + conf.port = 51010; + conf.clientPort = 12346; + TracePlugin aPlugin = new TracePlugin(conf); + conf.port = 51011; + conf.clientPort = 12347; + TracePlugin bPlugin = new TracePlugin(conf); + conf.port = 51012; + conf.clientPort = 12348; + TracePlugin cPlugin = new TracePlugin(conf); + conf.port = 51013; + conf.clientPort = 12349; + TracePlugin dPlugin = new TracePlugin(conf); + + // Responders + Responder bRes = new RecursingResponder(advancedProtocol, bPlugin); + bRes.addRPCPlugin(bPlugin); + HttpServer server1 = new HttpServer(bRes, 21005); + server1.start(); + + Responder cRes = new EndpointResponder(advancedProtocol); + cRes.addRPCPlugin(cPlugin); + HttpServer server2 = new HttpServer(cRes, 21006); + server2.start(); + + Responder dRes = new EndpointResponder(advancedProtocol); + dRes.addRPCPlugin(dPlugin); + HttpServer server3 = new HttpServer(dRes, 21007); + server3.start(); + + // Root requestor + HttpTransceiver trans = new HttpTransceiver( + new URL("http://localhost:21005")); + + GenericRequestor r = new GenericRequestor(advancedProtocol, trans); + r.addRPCPlugin(aPlugin); + + GenericRecord params = new GenericData.Record( + advancedProtocol.getMessages().get("w").getRequest()); + params.put("req", 1); + r.request("w", params); + + ArrayList allSpans = new ArrayList(); + + allSpans.addAll(aPlugin.storage.getAllSpans()); + allSpans.addAll(bPlugin.storage.getAllSpans()); + allSpans.addAll(cPlugin.storage.getAllSpans()); + allSpans.addAll(dPlugin.storage.getAllSpans()); + + SpanAggregationResults results = SpanAggregator.getFullSpans(allSpans); + + TraceFormationResults traces = SpanAggregator.getTraces(results.completeSpans); + + assertEquals(1, traces.traces.size()); + assertEquals(0, traces.rejectedSpans.size()); + + // Test debug printing of traces + String string1 = traces.traces.get(0).printWithTiming(); + assertTrue(string1.contains("w")); + assertTrue(string1.contains("x")); + assertTrue(string1.indexOf("x") != string1.lastIndexOf("x")); // assure two x's + + String string2 = traces.traces.get(0).printBrief(); + assertTrue(string2.contains("w")); + assertTrue(string2.contains("x")); + assertTrue(string2.indexOf("x") != string2.lastIndexOf("x")); // assure two x's + + // Just for fun, print to console + System.out.println(traces.traces.get(0).printWithTiming()); + System.out.println(traces.traces.get(0).printBrief()); + + server1.close(); + server2.close(); + server3.close(); + aPlugin.httpServer.close(); + aPlugin.clientFacingServer.stop(); + bPlugin.httpServer.close(); + bPlugin.clientFacingServer.stop(); + cPlugin.httpServer.close(); + cPlugin.clientFacingServer.stop(); + dPlugin.httpServer.close(); + dPlugin.clientFacingServer.stop(); + } + + /** Sleeps as requested. */ + private static class SleepyResponder extends GenericResponder { + public SleepyResponder(Protocol local) { + super(local); + } + + @Override + public Object respond(Message message, Object request) + throws AvroRemoteException { + try { + Thread.sleep((Long)((GenericRecord)request).get("millis")); + } catch (InterruptedException e) { + throw new AvroRemoteException(e); + } + return null; + } + } +} Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java?rev=982434&view=auto ============================================================================== --- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java (added) +++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/trace/TestSpanAggregation.java Thu Aug 5 00:07:24 2010 @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.ipc.trace; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.apache.avro.ipc.trace.Util.idValue; +import static org.apache.avro.ipc.trace.Util.idsEqual; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.ipc.trace.SpanAggregator.SpanAggregationResults; +import org.apache.avro.util.Utf8; +import org.junit.Test; + +import edu.emory.mathcs.backport.java.util.Arrays; + +/** + * Tests for span aggregation utility methods. + */ +public class TestSpanAggregation { + /** + * Test merging of two basic spans. + */ + @Test + public void testSpanCompletion1() { + Span span1a = createClientSpan(idValue(1), idValue(1), null, new Utf8("a")); + Span span1b = createServerSpan(idValue(1), idValue(1), null, new Utf8("a")); + + List partials = new ArrayList(); + partials.add(span1a); + partials.add(span1b); + SpanAggregationResults results = SpanAggregator.getFullSpans(partials); + + assertNotNull(results.completeSpans); + assertNotNull(results.incompleteSpans); + assertTrue(results.incompleteSpans.size() == 0); + assertTrue(results.completeSpans.size() == 1); + + Span result = results.completeSpans.get(0); + assertEquals(null, result.parentSpanID); + assertTrue(idsEqual(idValue(1), result.spanID)); + assertEquals(4, result.events.size()); + } + + /** + * Test span merging with some extra invalid spans; + */ + @Test + public void testInvalidSpanCompletion() { + // Trace: 1, Span: 1, Parent: null + Span span1a = createClientSpan(idValue(1), idValue(1), null, new Utf8("a")); + Span span1b = createServerSpan(idValue(1), idValue(1), null, new Utf8("a")); + + // Trace: 1, Span: 10, Parent: 3 + Span spanBogus1 = createClientSpan(idValue(1), idValue(10), idValue(3), new Utf8("not")); + Span spanBogus2 = createServerSpan(idValue(1), idValue(10), idValue(3), new Utf8("equal")); + + // Trace: 1, Span: 5, Parent: (2/3) + Span spanBogus3 = createClientSpan(idValue(1), idValue(5), idValue(2), new Utf8("equal")); + Span spanBogus4 = createServerSpan(idValue(1), idValue(5), idValue(3), new Utf8("equal")); + + // Trace:1, Span: 4, Parent: 1 + Span spanBogus5 = createClientSpan(idValue(1), idValue(4), idValue(1), new Utf8("alone")); + + List partials = new ArrayList(); + partials.add(span1a); + partials.add(span1b); + partials.add(spanBogus1); + partials.add(spanBogus2); + partials.add(spanBogus3); + partials.add(spanBogus4); + partials.add(spanBogus5); + + SpanAggregationResults results = SpanAggregator.getFullSpans(partials); + assertNotNull(results.completeSpans); + assertNotNull(results.incompleteSpans); + + assertTrue(results.incompleteSpans.size() == 5); + assertTrue(results.incompleteSpans.contains(spanBogus1)); + assertTrue(results.incompleteSpans.contains(spanBogus2)); + assertTrue(results.incompleteSpans.contains(spanBogus3)); + assertTrue(results.incompleteSpans.contains(spanBogus4)); + assertTrue(results.incompleteSpans.contains(spanBogus5)); + + assertTrue(results.completeSpans.size() == 1); + Span result = results.completeSpans.get(0); + assertTrue(result.complete); + assertTrue(idsEqual(idValue(1), result.spanID)); + assertEquals(new Utf8("requestorHostname"), result.requestorHostname); + assertEquals(new Utf8("responderHostname"), result.responderHostname); + assertNull(result.parentSpanID); + assertEquals(new Utf8("a"), result.messageName); + } + + /** + * Test basic formation of a trace. + * a + * b + * c d + * e + */ + @Test + public void testTraceFormation1() { + Span a1 = createClientSpan(idValue(1), idValue(1), null, new Utf8("a")); + Span a2 = createServerSpan(idValue(1), idValue(1), null, new Utf8("a")); + + Span b1 = createClientSpan(idValue(1), idValue(2), idValue(1), new Utf8("b")); + Span b2 = createServerSpan(idValue(1), idValue(2), idValue(1), new Utf8("b")); + + Span c1 = createClientSpan(idValue(1), idValue(3), idValue(2), new Utf8("c")); + Span c2 = createServerSpan(idValue(1), idValue(3), idValue(2), new Utf8("c")); + + Span d1 = createClientSpan(idValue(1), idValue(4), idValue(2), new Utf8("d")); + Span d2 = createServerSpan(idValue(1), idValue(4), idValue(2), new Utf8("d")); + + Span e1 = createClientSpan(idValue(1), idValue(5), idValue(4), new Utf8("e")); + Span e2 = createServerSpan(idValue(1), idValue(5), idValue(4), new Utf8("e")); + + List spans = new LinkedList(); + spans.addAll(Arrays.asList(new Span[] {a1, a2, b1, b2, c1, c2, d1, d2, e1, e2})); + + List merged = SpanAggregator.getFullSpans(spans).completeSpans; + + assertEquals(5, merged.size()); + for (Span s: merged) { + assertEquals(new Utf8("requestorHostname"), s.requestorHostname); + assertEquals(new Utf8("responderHostname"), s.responderHostname); + } + + List traces = SpanAggregator.getTraces(merged).traces; + assertEquals(1, traces.size()); + + assertEquals("Trace: (a (b (c) (d (e))))", traces.get(0).printBrief()); + + } + + /** + * Make a mock Span including client-side timing data. + */ + public Span createClientSpan(ID traceID, ID spanID, ID parentID, Utf8 msgName) { + Span out = new Span(); + out.spanID = spanID; + out.traceID = traceID; + out.requestorHostname = new Utf8("requestorHostname"); + + if (parentID != null) { + out.parentSpanID = parentID; + } + out.messageName = msgName; + out.complete = false; + + TimestampedEvent event1 = new TimestampedEvent(); + event1.event = SpanEvent.CLIENT_SEND; + event1.timeStamp = System.currentTimeMillis() * 1000000; + + TimestampedEvent event2 = new TimestampedEvent(); + event2.event = SpanEvent.CLIENT_RECV; + event2.timeStamp = System.currentTimeMillis() * 1000000; + + out.events = new GenericData.Array( + 2, Schema.createArray(TimestampedEvent.SCHEMA$)); + out.events.add(event1); + out.events.add(event2); + + return out; + } + + /** + * Make a mock Span including server-side timing data. + */ + public Span createServerSpan(ID traceID, ID spanID, ID parentID, Utf8 msgName) { + Span out = new Span(); + out.spanID = spanID; + out.traceID = traceID; + out.responderHostname = new Utf8("responderHostname"); + + if (parentID != null) { + out.parentSpanID = parentID; + } + out.messageName = msgName; + out.complete = false; + + TimestampedEvent event1 = new TimestampedEvent(); + event1.event = SpanEvent.SERVER_RECV; + event1.timeStamp = System.currentTimeMillis(); + + TimestampedEvent event2 = new TimestampedEvent(); + event2.event = SpanEvent.SERVER_SEND; + event2.timeStamp = System.currentTimeMillis(); + + out.events = new GenericData.Array( + 2, Schema.createArray(TimestampedEvent.SCHEMA$)); + out.events.add(event1); + out.events.add(event2); + + return out; + } +}