phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject phoenix git commit: PHOENIX-3752 Remove hadoop metrics integration from the tracing framework (Karan Mehta)
Date Fri, 26 May 2017 17:38:05 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 b61e7d789 -> 118b932f6


PHOENIX-3752 Remove hadoop metrics integration from the tracing framework (Karan Mehta)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/118b932f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/118b932f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/118b932f

Branch: refs/heads/4.x-HBase-1.2
Commit: 118b932f65fa466a9bf7861c2c01ab1027866515
Parents: b61e7d7
Author: Samarth Jain <samarth@apache.org>
Authored: Fri May 26 10:38:06 2017 -0700
Committer: Samarth Jain <samarth@apache.org>
Committed: Fri May 26 10:38:06 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/trace/BaseTracingTestIT.java |  66 ++++
 .../phoenix/trace/DisableableMetricsWriter.java |  84 -----
 .../trace/PhoenixTableMetricsWriterIT.java      |  43 ++-
 .../phoenix/trace/PhoenixTraceReaderIT.java     | 180 -----------
 .../phoenix/trace/PhoenixTracingEndToEndIT.java | 223 ++++++++-----
 .../org/apache/phoenix/query/QueryServices.java |   4 +
 .../phoenix/query/QueryServicesOptions.java     |  78 +++--
 .../apache/phoenix/trace/TraceMetricSource.java | 183 -----------
 .../org/apache/phoenix/trace/TraceReader.java   |  16 +-
 .../apache/phoenix/trace/TraceSpanReceiver.java |  95 ++++++
 .../org/apache/phoenix/trace/TraceWriter.java   | 323 +++++++++++++++++++
 .../org/apache/phoenix/trace/util/Tracing.java  |  15 +-
 .../phoenix/trace/TraceMetricsSourceTest.java   |  37 +--
 13 files changed, 745 insertions(+), 602 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
index eed5618..e3a7510 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsInfo;
@@ -36,6 +37,8 @@ import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.impl.ExposedMetricCounterLong;
 import org.apache.hadoop.metrics2.impl.ExposedMetricsRecordImpl;
 import org.apache.hadoop.metrics2.lib.ExposedMetricsInfoImpl;
+import org.apache.htrace.Span;
+import org.apache.htrace.impl.MilliSpan;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.metrics.MetricInfo;
 import org.apache.phoenix.trace.util.Tracing;
@@ -50,6 +53,10 @@ import org.apache.phoenix.util.PropertiesUtil;
 
 public class BaseTracingTestIT extends ParallelStatsDisabledIT {
 
+    protected CountDownLatch latch;
+    protected int defaultTracingThreadPoolForTest = 1;
+    protected int defaultTracingBatchSizeForTest = 1;
+
     public static Connection getConnectionWithoutTracing() throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         return getConnectionWithoutTracing(props);
@@ -127,4 +134,63 @@ public class BaseTracingTestIT extends ParallelStatsDisabledIT {
     private static MetricsInfo asInfo(String name) {
         return new ExposedMetricsInfoImpl(name, "");
     }
+
+    protected Span createNewSpan(long traceid, long parentid, long spanid, String description,
+            long startTime, long endTime, String processid, String... tags) {
+
+        Span span = new MilliSpan.Builder()
+                .description(description)
+                .traceId(traceid)
+                .parents(new long[] {parentid})
+                .spanId(spanid)
+                .processId(processid)
+                .begin(startTime)
+                .end(endTime)
+                .build();
+
+        int tagCount = 0;
+        for(String annotation : tags) {
+            span.addKVAnnotation((Integer.toString(tagCount++)).getBytes(), annotation.getBytes());
+        }
+        return span;
+    }
+
+
+    private static class CountDownConnection extends DelegateConnection {
+        private CountDownLatch commit;
+
+        public CountDownConnection(Connection conn, CountDownLatch commit) {
+            super(conn);
+            this.commit = commit;
+        }
+
+        @Override
+        public void commit() throws SQLException {
+            super.commit();
+            commit.countDown();
+        }
+
+    }
+
+    protected class TestTraceWriter extends TraceWriter {
+
+        public TestTraceWriter(String tableName, int numThreads, int batchSize) {
+            super(tableName, numThreads, batchSize);
+        }
+
+        @Override
+        protected Connection getConnection(String tableName) {
+            try {
+                Connection connection = new CountDownConnection(getConnectionWithoutTracing(), latch);
+                if(!traceTableExists(connection, tableName)) {
+                    createTable(connection, tableName);
+                }
+                return connection;
+            } catch (SQLException e) {
+                e.printStackTrace();
+            }
+            return null;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java
deleted file mode 100644
index 875717c..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.trace;
-
-import org.apache.commons.configuration.SubsetConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.MetricsSink;
-
-import java.sql.SQLException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- *
- */
-public class DisableableMetricsWriter implements MetricsSink {
-
-    private static final Log LOG = LogFactory.getLog(DisableableMetricsWriter.class);
-    private PhoenixMetricsSink writer;
-    private AtomicBoolean disabled = new AtomicBoolean(false);
-
-    public DisableableMetricsWriter(PhoenixMetricsSink writer) {
-        this.writer = writer;
-    }
-
-    @Override
-    public void init(SubsetConfiguration config) {
-        if (this.disabled.get()) return;
-        writer.init(config);
-    }
-
-    @Override
-    public void flush() {
-        if (this.disabled.get()) {
-            clear();
-            return;
-        }
-        writer.flush();
-
-    }
-
-    @Override
-    public void putMetrics(MetricsRecord record) {
-        if (this.disabled.get()) return;
-        writer.putMetrics(record);
-    }
-
-    public void disable() {
-        this.disabled.set(true);
-    }
-
-    public void enable() {
-        this.disabled.set(false);
-    }
-
-    public void clear() {
-        // clear any pending writes
-        try {
-            writer.clearForTesting();
-        } catch (SQLException e) {
-            LOG.error("Couldn't clear the delgate writer when flush called and disabled", e);
-        }
-    }
-
-    public PhoenixMetricsSink getDelegate() {
-        return this.writer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
index dbb34ba..88ab6ff 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
@@ -18,12 +18,15 @@
 package org.apache.phoenix.trace;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.htrace.Span;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.trace.TraceReader.SpanInfo;
 import org.apache.phoenix.trace.TraceReader.TraceHolder;
@@ -34,6 +37,7 @@ import org.junit.Test;
  */
 public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
 
+    private TestTraceWriter testTraceWriter;
     /**
      * IT should create the target table if it hasn't been created yet, but not fail if the table
      * has already been created
@@ -41,10 +45,9 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
      */
     @Test
     public void testCreatesTable() throws Exception {
-        PhoenixMetricsSink sink = new PhoenixMetricsSink();
+
+        testTraceWriter = new TestTraceWriter(generateUniqueName(), defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest);
         Connection conn = getConnectionWithoutTracing();
-        String tableName = generateUniqueName();
-        sink.initForTesting(conn, tableName);
 
         // check for existence of the tracing table
         try {
@@ -56,26 +59,21 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
             // expected
         }
 
-        // initialize sink again, which should attempt to create the table, but not fail
-        try {
-            sink.initForTesting(conn, tableName);
-        } catch (Exception e) {
-            fail("Initialization shouldn't fail if table already exists!");
-        }
     }
 
     /**
      * Simple metrics writing and reading check, that uses the standard wrapping in the
-     * {@link PhoenixMetricsSink}
+     * {@link TraceWriter}
      * @throws Exception on failure
      */
     @Test
     public void writeMetrics() throws Exception {
-        // hook up a phoenix sink
-        PhoenixMetricsSink sink = new PhoenixMetricsSink();
+
         Connection conn = getConnectionWithoutTracing();
         String tableName = generateUniqueName();
-        sink.initForTesting(conn, tableName);
+        TraceSpanReceiver traceSpanReceiver = new TraceSpanReceiver();
+        latch = new CountDownLatch(1);
+        testTraceWriter = new TestTraceWriter(tableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest);
 
         // create a simple metrics record
         long traceid = 987654;
@@ -84,15 +82,14 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
         long parentid = 11;
         long startTime = 12;
         long endTime = 13;
+        String processid = "Some process";
         String annotation = "test annotation for a span";
-        String hostnameValue = "host-name.value";
-       MetricsRecord record =
-                createRecord(traceid, parentid, spanid, description, startTime, endTime,
-                    hostnameValue, annotation);
 
-        // actually write the record to the table
-        sink.putMetrics(record);
-        sink.flush();
+        Span span = createNewSpan(traceid, parentid, spanid, description, startTime, endTime,
+            processid, annotation);
+
+        traceSpanReceiver.getSpanQueue().add(span);
+        assertTrue("Span never committed to table", latch.await(30, TimeUnit.SECONDS));
 
         // make sure we only get expected stat entry (matcing the trace id), otherwise we could the
         // stats for the update as well
@@ -111,8 +108,8 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
         assertEquals(parentid, spanInfo.getParentIdForTesting());
         assertEquals(startTime, spanInfo.start);
         assertEquals(endTime, spanInfo.end);
-        assertEquals(hostnameValue, spanInfo.hostname);
         assertEquals("Wrong number of tags", 0, spanInfo.tagCount);
         assertEquals("Wrong number of annotations", 1, spanInfo.annotationCount);
     }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java
deleted file mode 100644
index 723810f..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.trace;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.AbstractMetric;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.htrace.Span;
-import org.apache.phoenix.metrics.MetricInfo;
-import org.apache.phoenix.trace.TraceReader.SpanInfo;
-import org.apache.phoenix.trace.TraceReader.TraceHolder;
-import org.junit.Test;
-
-/**
- * Test that the {@link TraceReader} will correctly read traces written by the
- * {@link org.apache.phoenix.trace.PhoenixMetricsSink}
- */
-
-public class PhoenixTraceReaderIT extends BaseTracingTestIT {
-
-    private static final Log LOG = LogFactory.getLog(PhoenixTraceReaderIT.class);
-
-    @Test
-    public void singleSpan() throws Exception {
-        PhoenixMetricsSink sink = new PhoenixMetricsSink();
-        Properties props = new Properties(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        String tableName = generateUniqueName();
-        sink.initForTesting(conn, tableName);
-
-        // create a simple metrics record
-        long traceid = 987654;
-        MetricsRecord record =
-                createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 10, "root", 12, 13,
-                    "host-name.value", "test annotation for a span");
-
-        // start a reader
-        validateTraces(Collections.singletonList(record), conn, traceid, tableName);
-    }
-
-    private MetricsRecord createAndFlush(PhoenixMetricsSink sink, long traceid,
-            long parentid, long spanid, String desc, long startTime, long endTime, String hostname,
-            String... tags) {
-        MetricsRecord record =
-                createRecord(traceid, parentid, spanid, desc, startTime, endTime, hostname, tags);
-        sink.putMetrics(record);
-        sink.flush();
-        return record;
-    }
-
-    /**
-     * Test multiple spans, within the same trace. Some spans are independent of the parent span,
-     * some are child spans
-     * @throws Exception on failure
-     */
-    @Test
-    public void testMultipleSpans() throws Exception {
-        // hook up a phoenix sink
-        PhoenixMetricsSink sink = new PhoenixMetricsSink();
-        Connection conn = getConnectionWithoutTracing();
-        String tableName = generateUniqueName();
-        sink.initForTesting(conn, tableName);
-
-        // create a simple metrics record
-        long traceid = 12345;
-        List<MetricsRecord> records = new ArrayList<MetricsRecord>();
-        MetricsRecord record =
-                createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 7777, "root", 10, 30,
-                    "hostname.value", "root-span tag");
-        records.add(record);
-
-        // then create a child record
-        record =
-                createAndFlush(sink, traceid, 7777, 6666, "c1", 11, 15, "hostname.value",
-                    "first child");
-        records.add(record);
-
-        // create a different child
-        record =
-                createAndFlush(sink, traceid, 7777, 5555, "c2", 11, 18, "hostname.value",
-                    "second child");
-        records.add(record);
-
-        // create a child of the second child
-        record =
-                createAndFlush(sink, traceid, 5555, 4444, "c3", 12, 16, "hostname.value",
-                    "third child");
-        records.add(record);
-
-        // flush all the values to the table
-        sink.flush();
-
-        // start a reader
-        validateTraces(records, conn, traceid, tableName);
-    }
-
-    private void validateTraces(List<MetricsRecord> records, Connection conn, long traceid, String tableName)
-            throws Exception {
-        TraceReader reader = new TraceReader(conn, tableName);
-        Collection<TraceHolder> traces = reader.readAll(1);
-        assertEquals("Got an unexpected number of traces!", 1, traces.size());
-        // make sure the trace matches what we wrote
-        TraceHolder trace = traces.iterator().next();
-        assertEquals("Got an unexpected traceid", traceid, trace.traceid);
-        assertEquals("Got an unexpected number of spans", records.size(), trace.spans.size());
-
-        validateTrace(records, trace);
-    }
-
-    /**
-     * @param records
-     * @param trace
-     */
-    private void validateTrace(List<MetricsRecord> records, TraceHolder trace) {
-        // drop each span into a sorted list so we get the expected ordering
-        Iterator<SpanInfo> spanIter = trace.spans.iterator();
-        for (MetricsRecord record : records) {
-            SpanInfo spanInfo = spanIter.next();
-            LOG.info("Checking span:\n" + spanInfo);
-            Iterator<AbstractMetric> metricIter = record.metrics().iterator();
-            assertEquals("Got an unexpected span id", metricIter.next().value(), spanInfo.id);
-            long parentId = (Long) metricIter.next().value();
-            if (parentId == Span.ROOT_SPAN_ID) {
-                assertNull("Got a parent, but it was a root span!", spanInfo.parent);
-            } else {
-                assertEquals("Got an unexpected parent span id", parentId, spanInfo.parent.id);
-            }
-            assertEquals("Got an unexpected start time", metricIter.next().value(), spanInfo.start);
-            assertEquals("Got an unexpected end time", metricIter.next().value(), spanInfo.end);
-
-            Iterator<MetricsTag> tags = record.tags().iterator();
-
-            int annotationCount = 0;
-            while (tags.hasNext()) {
-                // hostname is a tag, so we differentiate it
-                MetricsTag tag = tags.next();
-                if (tag.name().equals(MetricInfo.HOSTNAME.traceName)) {
-                    assertEquals("Didn't store correct hostname value", tag.value(),
-                        spanInfo.hostname);
-                } else {
-                    int count = annotationCount++;
-                    assertEquals("Didn't get expected annotation", count + " - " + tag.value(),
-                        spanInfo.annotations.get(count));
-                }
-            }
-            assertEquals("Didn't get expected number of annotations", annotationCount,
-                spanInfo.annotationCount);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
index 5e05fe8..4477fa5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
@@ -18,9 +18,8 @@
 package org.apache.phoenix.trace;
 
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.*;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -28,13 +27,13 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Collection;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Sampler;
 import org.apache.htrace.Span;
 import org.apache.htrace.SpanReceiver;
@@ -43,11 +42,12 @@ import org.apache.htrace.TraceScope;
 import org.apache.htrace.impl.ProbabilitySampler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.metrics.Metrics;
 import org.apache.phoenix.trace.TraceReader.SpanInfo;
 import org.apache.phoenix.trace.TraceReader.TraceHolder;
+import org.apache.phoenix.trace.util.Tracing;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableMap;
@@ -56,52 +56,26 @@ import com.google.common.collect.ImmutableMap;
  * Test that the logging sink stores the expected metrics/stats
  */
 
-// Marking this class as abstract till PHOENIX-3062 is fixed.
-// FIXME: PHOENIX-3062
-public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
+public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
 
     private static final Log LOG = LogFactory.getLog(PhoenixTracingEndToEndIT.class);
     private static final int MAX_RETRIES = 10;
     private String enabledForLoggingTable;
     private String enableForLoggingIndex;
-
-    private DisableableMetricsWriter sink;
     private String tracingTableName;
+    private TestTraceWriter testTraceWriter = null;
 
     @Before
     public void setupMetrics() throws Exception {
-        PhoenixMetricsSink pWriter = new PhoenixMetricsSink();
-        Connection conn = getConnectionWithoutTracing();
         tracingTableName = "TRACING_" + generateUniqueName();
-        pWriter.initForTesting(conn, tracingTableName);
-        sink = new DisableableMetricsWriter(pWriter);
         enabledForLoggingTable = "ENABLED_FOR_LOGGING_" + generateUniqueName();
         enableForLoggingIndex = "ENABALED_FOR_LOGGING_INDEX_" + generateUniqueName();
-
-        TracingTestUtil.registerSink(sink, tracingTableName);
     }
 
     @After
-    public void cleanup() {
-        sink.disable();
-        sink.clear();
-        TracingTestUtil.unregisterSink(tracingTableName);
-    }
-
-    private void waitForCommit(CountDownLatch latch) throws SQLException {
-        Connection conn = new CountDownConnection(getConnectionWithoutTracing(), latch);
-        replaceWriterConnection(conn);
-    }
-
-    private void replaceWriterConnection(Connection conn) throws SQLException {
-        // disable the writer
-        sink.disable();
-
-        // swap the connection for one that listens
-        sink.getDelegate().initForTesting(conn, tracingTableName);
-
-        // enable the writer
-        sink.enable();
+    public void cleanUp() {
+        if(testTraceWriter != null)
+            testTraceWriter.stop();
     }
 
     /**
@@ -110,15 +84,11 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
      */
     @Test
     public void testWriteSpans() throws Exception {
-        // get a receiver for the spans
-        SpanReceiver receiver = new TraceMetricSource();
-        // which also needs to a source for the metrics system
-        Metrics.initialize().register("testWriteSpans-source", "source for testWriteSpans",
-                (MetricsSource) receiver);
 
         // watch our sink so we know when commits happen
-        CountDownLatch latch = new CountDownLatch(1);
-        waitForCommit(latch);
+        latch = new CountDownLatch(1);
+
+        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest);
 
         // write some spans
         TraceScope trace = Trace.startSpan("Start write test", Sampler.ALWAYS);
@@ -136,7 +106,7 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         trace.close();
 
         // pass the trace on
-        receiver.receiveSpan(span);
+        Tracing.getTraceSpanReceiver().receiveSpan(span);
 
         // wait for the tracer to actually do the write
         assertTrue("Sink not flushed. commit() not called on the connection", latch.await(60, TimeUnit.SECONDS));
@@ -176,10 +146,12 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
      * @throws Exception
      */
     @Test
+    @Ignore
     public void testClientServerIndexingTracing() throws Exception {
+
         // one call for client side, one call for server side
-        final CountDownLatch updated = new CountDownLatch(2);
-        waitForCommit(updated);
+        latch = new CountDownLatch(2);
+        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest);
 
         // separate connection so we don't create extra traces
         Connection conn = getConnectionWithoutTracing();
@@ -203,7 +175,7 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
 
         // wait for the latch to countdown, as the metrics system is time-based
         LOG.debug("Waiting for latch to complete!");
-        updated.await(200, TimeUnit.SECONDS);// should be way more than GC pauses
+        latch.await(200, TimeUnit.SECONDS);// should be way more than GC pauses
 
         // read the traces back out
 
@@ -259,8 +231,8 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         Connection conn = getConnectionWithoutTracing();
 
         // one call for client side, one call for server side
-        CountDownLatch updated = new CountDownLatch(2);
-        waitForCommit(updated);
+        latch = new CountDownLatch(2);
+        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest);
 
         // create a dummy table
         createTestTable(conn, false);
@@ -289,7 +261,7 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         assertTrue("Didn't get second result", results.next());
         results.close();
 
-        assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS));
+        assertTrue("Get expected updates to trace table", latch.await(200, TimeUnit.SECONDS));
         // don't trace reads either
         boolean tracingComplete = checkStoredTraces(conn, new TraceChecker(){
 
@@ -309,8 +281,8 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         Connection conn = getConnectionWithoutTracing();
 
         // one call for client side, one call for server side
-        CountDownLatch updated = new CountDownLatch(2);
-        waitForCommit(updated);
+        latch = new CountDownLatch(5);
+        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest);
 
         // create a dummy table
         createTestTable(conn, false);
@@ -338,7 +310,8 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         assertEquals("Didn't get the expected number of row", 2, results.getInt(1));
         results.close();
 
-        assertTrue("Didn't get expected updates to trace table", updated.await(60, TimeUnit.SECONDS));
+        assertTrue("Didn't get expected updates to trace table", latch.await(60, TimeUnit.SECONDS));
+
         // don't trace reads either
         boolean found = checkStoredTraces(conn, new TraceChecker() {
             @Override
@@ -360,8 +333,8 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         Connection conn = getConnectionWithoutTracing();
 
         // one call for client side, one call for server side
-        CountDownLatch updated = new CountDownLatch(2);
-        waitForCommit(updated);
+        latch = new CountDownLatch(2);
+        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest);
 
         // create a dummy table
         createTestTable(conn, false);
@@ -390,7 +363,7 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         assertTrue("Didn't get second result", results.next());
         results.close();
 
-        assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS));
+        assertTrue("Get expected updates to trace table", latch.await(200, TimeUnit.SECONDS));
 
         assertAnnotationPresent(customAnnotationKey, customAnnotationValue, conn);
         assertAnnotationPresent(TENANT_ID_ATTRIB, tenantId, conn);
@@ -399,12 +372,12 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
 
     @Test
     public void testTraceOnOrOff() throws Exception {
-        Connection conn1 = DriverManager.getConnection(getUrl());
+        Connection conn1 = getConnectionWithoutTracing(); //DriverManager.getConnection(getUrl());
         try{
             Statement statement = conn1.createStatement();
             ResultSet  rs = statement.executeQuery("TRACE ON");
             assertTrue(rs.next());
-            PhoenixConnection pconn= (PhoenixConnection) conn1;
+            PhoenixConnection pconn = (PhoenixConnection) conn1;
             long traceId = pconn.getTraceScope().getSpan().getTraceId();
             assertEquals(traceId, rs.getLong(1));
             assertEquals(traceId, rs.getLong("trace_id"));
@@ -444,16 +417,131 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
 
             rs = statement.executeQuery("TRACE OFF");
             assertFalse(rs.next());
+
        } finally {
             conn1.close();
         }
     }
 
+    @Test
+    public void testSingleSpan() throws Exception {
+
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        latch = new CountDownLatch(1);
+        testTraceWriter = new TestTraceWriter(tableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest);
+
+        // create a simple metrics record
+        long traceid = 987654;
+        Span span = createNewSpan(traceid, Span.ROOT_SPAN_ID, 10, "root", 12, 13, "Some process", "test annotation for a span");
+
+        Tracing.getTraceSpanReceiver().receiveSpan(span);
+        assertTrue("Updates not written in table", latch.await(60, TimeUnit.SECONDS));
+
+        // start a reader
+        validateTraces(Collections.singletonList(span), conn, traceid, tableName);
+    }
+
+    /**
+     * Test multiple spans, within the same trace. Some spans are independent of the parent span,
+     * some are child spans
+     * @throws Exception on failure
+     */
+    @Test
+    public void testMultipleSpans() throws Exception {
+
+        Connection conn = getConnectionWithoutTracing();
+        String tableName = generateUniqueName();
+        latch = new CountDownLatch(4);
+        testTraceWriter = new TestTraceWriter(tableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest);
+
+        // create a simple metrics record
+        long traceid = 12345;
+        List<Span> spans = new ArrayList<Span>();
+
+        Span span =
+                createNewSpan(traceid, Span.ROOT_SPAN_ID, 7777, "root", 10, 30,
+                        "root process", "root-span tag");
+        spans.add(span);
+
+        // then create a child record
+        span =
+                createNewSpan(traceid, 7777, 6666, "c1", 11, 15, "c1 process",
+                        "first child");
+        spans.add(span);
+
+        // create a different child
+        span =
+                createNewSpan(traceid, 7777, 5555, "c2", 11, 18, "c2 process",
+                        "second child");
+        spans.add(span);
+
+        // create a child of the second child
+        span =
+                createNewSpan(traceid, 5555, 4444, "c3", 12, 16, "c3 process",
+                        "third child");
+        spans.add(span);
+
+        for(Span span1 : spans)
+            Tracing.getTraceSpanReceiver().receiveSpan(span1);
+
+        assertTrue("Updates not written in table", latch.await(100, TimeUnit.SECONDS));
+
+        // start a reader
+        validateTraces(spans, conn, traceid, tableName);
+    }
+
+    private void validateTraces(List<Span> spans, Connection conn, long traceid, String tableName)
+            throws Exception {
+        TraceReader reader = new TraceReader(conn, tableName);
+        Collection<TraceHolder> traces = reader.readAll(1);
+        assertEquals("Got an unexpected number of traces!", 1, traces.size());
+        // make sure the trace matches what we wrote
+        TraceHolder trace = traces.iterator().next();
+        assertEquals("Got an unexpected traceid", traceid, trace.traceid);
+        assertEquals("Got an unexpected number of spans", spans.size(), trace.spans.size());
+
+        validateTrace(spans, trace);
+    }
+
+    /**
+     * @param spans
+     * @param trace
+     */
+    private void validateTrace(List<Span> spans, TraceHolder trace) {
+        // drop each span into a sorted list so we get the expected ordering
+        Iterator<SpanInfo> spanIter = trace.spans.iterator();
+        for (Span span : spans) {
+            SpanInfo spanInfo = spanIter.next();
+            LOG.info("Checking span:\n" + spanInfo);
+
+            long parentId = span.getParentId();
+            if(parentId == Span.ROOT_SPAN_ID) {
+                assertNull("Got a parent, but it was a root span!", spanInfo.parent);
+            } else {
+                assertEquals("Got an unexpected parent span id", parentId, spanInfo.parent.id);
+            }
+
+            assertEquals("Got an unexpected start time", span.getStartTimeMillis(), spanInfo.start);
+            assertEquals("Got an unexpected end time", span.getStopTimeMillis(), spanInfo.end);
+
+            int annotationCount = 0;
+            for(Map.Entry<byte[], byte[]> entry : span.getKVAnnotations().entrySet()) {
+                int count = annotationCount++;
+                assertEquals("Didn't get expected annotation", count + " - " + Bytes.toString(entry.getValue()),
+                        spanInfo.annotations.get(count));
+            }
+            assertEquals("Didn't get expected number of annotations", annotationCount,
+                    spanInfo.annotationCount);
+        }
+    }
+
     private void assertAnnotationPresent(final String annotationKey, final String annotationValue, Connection conn) throws Exception {
         boolean tracingComplete = checkStoredTraces(conn, new TraceChecker(){
             @Override
             public boolean foundTrace(TraceHolder currentTrace) {
-            	return currentTrace.toString().contains(annotationKey + " - " + annotationValue);
+                return currentTrace.toString().contains(annotationKey + " - " + annotationValue);
             }
         });
 
@@ -496,19 +584,4 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         }
     }
 
-    private static class CountDownConnection extends DelegateConnection {
-        private CountDownLatch commit;
-
-        public CountDownConnection(Connection conn, CountDownLatch commit) {
-            super(conn);
-            this.commit = commit;
-        }
-
-        @Override
-        public void commit() throws SQLException {
-            commit.countDown();
-            super.commit();
-        }
-
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 81d05bd..331b596 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -155,6 +155,10 @@ public interface QueryServices extends SQLCloseable {
     public static final String TRACING_PROBABILITY_THRESHOLD_ATTRIB = "phoenix.trace.probability.threshold";
     public static final String TRACING_STATS_TABLE_NAME_ATTRIB = "phoenix.trace.statsTableName";
     public static final String TRACING_CUSTOM_ANNOTATION_ATTRIB_PREFIX = "phoenix.trace.custom.annotation.";
+    public static final String TRACING_ENABLED = "phoenix.trace.enabled";
+    public static final String TRACING_BATCH_SIZE = "phoenix.trace.batchSize";
+    public static final String TRACING_THREAD_POOL_SIZE = "phoenix.trace.threadPoolSize";
+    public static final String TRACING_TRACE_BUFFER_SIZE = "phoenix.trace.traceBufferSize";
 
     public static final String USE_REVERSE_SCAN_ATTRIB = "phoenix.query.useReverseScan";
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 35eda60..b1d8a7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -82,6 +82,11 @@ import static org.apache.phoenix.query.QueryServices.TRANSACTIONS_ENABLED;
 import static org.apache.phoenix.query.QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING;
 import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.TRACING_ENABLED;
+import static org.apache.phoenix.query.QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.TRACING_BATCH_SIZE;
+import static org.apache.phoenix.query.QueryServices.TRACING_THREAD_POOL_SIZE;
+import static org.apache.phoenix.query.QueryServices.TRACING_TRACE_BUFFER_SIZE;
 
 import java.util.HashSet;
 import java.util.Map.Entry;
@@ -129,6 +134,10 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated
     public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also.
     public static final long DEFAULT_DRIVER_SHUTDOWN_TIMEOUT_MS = 5  * 1000; // Time to wait in ShutdownHook to exit gracefully.
+    public static final boolean DEFAULT_TRACING_ENABLED = false;
+    public static final int DEFAULT_TRACING_THREAD_POOL_SIZE = 5;
+    public static final int DEFAULT_TRACING_BATCH_SIZE = 100;
+    public static final int DEFAULT_TRACING_TRACE_BUFFER_SIZE = 1000;
 
     @Deprecated //use DEFAULT_MUTATE_BATCH_SIZE_BYTES
     public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for UPSERT SELECT and DELETE
@@ -227,12 +236,12 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_TABLE_ISTRANSACTIONAL = false;
     public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false;
     public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
-    
+
     public static final boolean DEFAULT_TRANSACTIONAL = false;
     public static final boolean DEFAULT_AUTO_FLUSH = false;
 
     private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
-    
+
     public static final String DEFAULT_CONSISTENCY_LEVEL = Consistency.STRONG.toString();
 
     public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
@@ -241,10 +250,10 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_REQUEST_LEVEL_METRICS_ENABLED = false;
     public static final boolean DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE = true;
     public static final int DEFAULT_MAX_VERSIONS_TRANSACTIONAL = Integer.MAX_VALUE;
-    
+
     public static final boolean DEFAULT_RETURN_SEQUENCE_VALUES = false;
     public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = "";
-    
+
     public static final long DEFAULT_INDEX_POPULATION_SLEEP_TIME = 5000;
 
     // QueryServer defaults -- ensure ThinClientUtil is also updated since phoenix-queryserver-client
@@ -264,9 +273,9 @@ public class QueryServicesOptions {
     public static final int DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE = 10;
     public static final boolean DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE = true;
     public static final float DEFAULT_LIMITED_QUERY_SERIAL_THRESHOLD = 0.2f;
-    
+
     public static final boolean DEFAULT_INDEX_ASYNC_BUILD_ENABLED = true;
-    
+
     public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString();
     public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
     public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000;
@@ -361,7 +370,10 @@ public class QueryServicesOptions {
             .setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE)
             .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)
             .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED)
-            .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING);
+            .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
+            .setIfUnset(TRACING_ENABLED, DEFAULT_TRACING_ENABLED)
+            .setIfUnset(TRACING_BATCH_SIZE, DEFAULT_TRACING_BATCH_SIZE)
+            .setIfUnset(TRACING_THREAD_POOL_SIZE, DEFAULT_TRACING_THREAD_POOL_SIZE);
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
         // it to 1, so we'll change it.
@@ -556,7 +568,33 @@ public class QueryServicesOptions {
     public int getSpillableGroupByNumSpillFiles() {
         return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES);
     }
-    
+
+    public boolean isTracingEnabled() {
+        return config.getBoolean(TRACING_ENABLED, DEFAULT_TRACING_ENABLED);
+    }
+
+    public QueryServicesOptions setTracingEnabled(boolean enable) {
+        config.setBoolean(TRACING_ENABLED, enable);
+        return this;
+    }
+
+    public int getTracingThreadPoolSize() {
+        return config.getInt(TRACING_THREAD_POOL_SIZE, DEFAULT_TRACING_THREAD_POOL_SIZE);
+    }
+
+    public int getTracingBatchSize() {
+        return config.getInt(TRACING_BATCH_SIZE, DEFAULT_TRACING_BATCH_SIZE);
+    }
+
+    public int getTracingTraceBufferSize() {
+        return config.getInt(TRACING_TRACE_BUFFER_SIZE, DEFAULT_TRACING_TRACE_BUFFER_SIZE);
+    }
+
+    public String getTableName() {
+        return config.get(TRACING_STATS_TABLE_NAME_ATTRIB, DEFAULT_TRACING_STATS_TABLE_NAME);
+    }
+
+
     public boolean isGlobalMetricsEnabled() {
         return config.getBoolean(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED);
     }
@@ -647,9 +685,9 @@ public class QueryServicesOptions {
     public QueryServicesOptions setDelayInMillisForSchemaChangeCheck(long delayInMillis) {
         config.setLong(DELAY_FOR_SCHEMA_UPDATE_CHECK, delayInMillis);
         return this;
-    
+
     }
-    
+
     public QueryServicesOptions setUseByteBasedRegex(boolean flag) {
         config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag);
         return this;
@@ -659,7 +697,7 @@ public class QueryServicesOptions {
         config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder);
         return this;
     }
-    
+
     public QueryServicesOptions setExtraJDBCArguments(String extraArgs) {
         config.set(EXTRA_JDBC_ARGUMENTS_ATTRIB, extraArgs);
         return this;
@@ -674,40 +712,40 @@ public class QueryServicesOptions {
         config.setBoolean(COMMIT_STATS_ASYNC, flag);
         return this;
     }
-    
+
     public QueryServicesOptions setEnableRenewLease(boolean enable) {
         config.setBoolean(RENEW_LEASE_ENABLED, enable);
         return this;
     }
-    
+
     public QueryServicesOptions setIndexHandlerCount(int count) {
         config.setInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, count);
         return this;
     }
-    
+
     public QueryServicesOptions setMetadataHandlerCount(int count) {
         config.setInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, count);
         return this;
     }
-    
+
     public QueryServicesOptions setHConnectionPoolCoreSize(int count) {
         config.setInt(QueryServices.HCONNECTION_POOL_CORE_SIZE, count);
         return this;
     }
-    
+
     public QueryServicesOptions setHConnectionPoolMaxSize(int count) {
         config.setInt(QueryServices.HCONNECTION_POOL_MAX_SIZE, count);
         return this;
     }
-    
+
     public QueryServicesOptions setMaxThreadsPerHTable(int count) {
         config.setInt(QueryServices.HTABLE_MAX_THREADS, count);
         return this;
     }
-    
+
     public QueryServicesOptions setDefaultIndexPopulationWaitTime(long waitTime) {
         config.setLong(INDEX_POPULATION_SLEEP_TIME, waitTime);
         return this;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java
deleted file mode 100644
index e92dd6a..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.trace;
-
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.metrics2.*;
-import org.apache.hadoop.metrics2.lib.Interns;
-import org.apache.phoenix.metrics.MetricInfo;
-import org.apache.phoenix.metrics.Metrics;
-import org.apache.htrace.HTraceConfiguration;
-import org.apache.htrace.Span;
-import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.TimelineAnnotation;
-import org.apache.htrace.impl.MilliSpan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import static org.apache.phoenix.metrics.MetricInfo.*;
-
-/**
- * Sink for request traces ({@link SpanReceiver}) that pushes writes to {@link MetricsSource} in a
- * format that we can more easily consume.
- * <p>
- * <p>
- * Rather than write directly to a phoenix table, we drop it into the metrics queue so we can more
- * cleanly handle it asyncrhonously.Currently, {@link MilliSpan} submits the span in a synchronized
- * block to all the receivers, which could have a lot of overhead if we are submitting to multiple
- * receivers.
- * <p>
- * The format of the generated metrics is this:
- * <ol>
- *   <li>All Metrics from the same span have the same name (allowing correlation in the sink)</li>
- *   <li>The description of the metric describes what it contains. For instance,
- *   <ul>
- *     <li>{@link MetricInfo#PARENT} is the id of the parent of this span. (Root span is
- *     {@link Span#ROOT_SPAN_ID}).</li>
- *     <li>{@value MetricInfo#START} is the start time of the span</li>
- *     <li>{@value MetricInfo#END} is the end time of the span</li>
- *   </ul></li>
- *   <li>Each span's messages are contained in a {@link MetricsTag} with the same name as above and a
- *   generic counter for the number of messages (to differentiate messages and provide timeline
- *   ordering).</li>
- * </ol>
- * <p>
- * <i>So why even submit to metrics2 framework if we only have a single source?</i>
- * <p>
- * This allows us to make the updates in batches. We might have spans that finish before other spans
- * (for instance in the same parent). By batching the updates we can lessen the overhead on the
- * client, which is also busy doing 'real' work. <br>
- * We could make our own queue and manage batching and filtering and dropping extra metrics, but
- * that starts to get complicated fast (its not as easy as it sounds) so we use metrics2 to abstract
- * out that pipeline and also provides us flexibility to dump metrics to other sources.
- * <p>
- * This is a somewhat rough implementation - we do excessive locking for correctness,
- * rather than trying to make it fast, for the moment.
- */
-public class TraceMetricSource implements SpanReceiver, MetricsSource {
-
-  private static final String EMPTY_STRING = "";
-
-  private static final String CONTEXT = "tracing";
-
-  private List<Metric> spans = new ArrayList<Metric>();
-
-  public TraceMetricSource() {
-
-    MetricsSystem manager = Metrics.initialize();
-
-    // Register this instance.
-    // For right now, we ignore the MBean registration issues that show up in DEBUG logs. Basically,
-    // we need a Jmx MBean compliant name. We'll get to a better name when we want that later
-    manager.register(CONTEXT, "Phoenix call tracing", this);
-  }
-
-  @Override
-  public void receiveSpan(Span span) {
-    Metric builder = new Metric(span);
-    // add all the metrics for the span
-    builder.addCounter(Interns.info(SPAN.traceName, EMPTY_STRING), span.getSpanId());
-    builder.addCounter(Interns.info(PARENT.traceName, EMPTY_STRING), span.getParentId());
-    builder.addCounter(Interns.info(START.traceName, EMPTY_STRING), span.getStartTimeMillis());
-    builder.addCounter(Interns.info(END.traceName, EMPTY_STRING), span.getStopTimeMillis());
-    // add the tags to the span. They were written in order received so we mark them as such
-    for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
-      builder.add(new MetricsTag(Interns.info(TAG.traceName, Long.toString(ta.getTime())), ta
-          .getMessage()));
-    }
-
-    // add the annotations. We assume they are serialized as strings and integers, but that can
-    // change in the future
-    Map<byte[], byte[]> annotations = span.getKVAnnotations();
-    for (Entry<byte[], byte[]> annotation : annotations.entrySet()) {
-      Pair<String, String> val =
-          TracingUtils.readAnnotation(annotation.getKey(), annotation.getValue());
-      builder.add(new MetricsTag(Interns.info(ANNOTATION.traceName, val.getFirst()), val
-          .getSecond()));
-    }
-
-    // add the span to the list we care about
-    synchronized (this) {
-      spans.add(builder);
-    }
-  }
-
-  @Override
-  public void getMetrics(MetricsCollector collector, boolean all) {
-    // add a marker record so we know how many spans are used
-    // this is also necessary to ensure that we register the metrics source as an MBean (avoiding a
-    // runtime warning)
-    MetricsRecordBuilder marker = collector.addRecord(TracingUtils.METRICS_MARKER_CONTEXT);
-    marker.add(new MetricsTag(new MetricsInfoImpl("stat", "num spans"), Integer
-        .toString(spans.size())));
-
-    // actually convert the known spans into metric records as well
-    synchronized (this) {
-      for (Metric span : spans) {
-        MetricsRecordBuilder builder = collector.addRecord(new MetricsInfoImpl(TracingUtils
-            .getTraceMetricName(span.id), span.desc));
-        builder.setContext(TracingUtils.METRICS_CONTEXT);
-        for (Pair<MetricsInfo, Long> metric : span.counters) {
-          builder.addCounter(metric.getFirst(), metric.getSecond());
-        }
-        for (MetricsTag tag : span.tags) {
-          builder.add(tag);
-        }
-      }
-      // reset the spans so we don't keep a big chunk of memory around
-      spans = new ArrayList<Metric>();
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    // noop
-  }
-
-  private static class Metric {
-
-    List<Pair<MetricsInfo, Long>> counters = new ArrayList<Pair<MetricsInfo, Long>>();
-    List<MetricsTag> tags = new ArrayList<MetricsTag>();
-    private String id;
-    private String desc;
-
-    public Metric(Span span) {
-      this.id = Long.toString(span.getTraceId());
-      this.desc = span.getDescription();
-    }
-
-    /**
-     * @param metricsInfoImpl
-     * @param startTimeMillis
-     */
-    public void addCounter(MetricsInfo metricsInfoImpl, long startTimeMillis) {
-      counters.add(new Pair<MetricsInfo, Long>(metricsInfoImpl, startTimeMillis));
-    }
-
-    /**
-     * @param metricsTag
-     */
-    public void add(MetricsTag metricsTag) {
-      tags.add(metricsTag);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
index 318453f..68b945c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
@@ -14,7 +14,8 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */package org.apache.phoenix.trace;
+ */
+package org.apache.phoenix.trace;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -30,6 +31,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.metrics.MetricInfo;
 import org.apache.phoenix.query.QueryServices;
@@ -40,7 +42,7 @@ import com.google.common.base.Joiner;
 import com.google.common.primitives.Longs;
 
 /**
- * Read the traces written to phoenix tables by the {@link PhoenixMetricsSink}.
+ * Read the traces written to phoenix tables by the {@link TraceWriter}.
  */
 public class TraceReader {
 
@@ -54,8 +56,8 @@ public class TraceReader {
                 comma.join(MetricInfo.TRACE.columnName, MetricInfo.PARENT.columnName,
                     MetricInfo.SPAN.columnName, MetricInfo.DESCRIPTION.columnName,
                     MetricInfo.START.columnName, MetricInfo.END.columnName,
-                    MetricInfo.HOSTNAME.columnName, PhoenixMetricsSink.TAG_COUNT,
-                        PhoenixMetricsSink.ANNOTATION_COUNT);
+                    MetricInfo.HOSTNAME.columnName, TraceWriter.TAG_COUNT,
+                    TraceWriter.ANNOTATION_COUNT);
     }
 
     private Connection conn;
@@ -177,13 +179,13 @@ public class TraceReader {
     private Collection<? extends String> getTags(long traceid, long parent, long span, int count)
             throws SQLException {
         return getDynamicCountColumns(traceid, parent, span, count,
-                PhoenixMetricsSink.TAG_FAMILY, MetricInfo.TAG.columnName);
+                TraceWriter.TAG_FAMILY, MetricInfo.TAG.columnName);
     }
 
     private Collection<? extends String> getAnnotations(long traceid, long parent, long span,
             int count) throws SQLException {
         return getDynamicCountColumns(traceid, parent, span, count,
-                PhoenixMetricsSink.ANNOTATION_FAMILY, MetricInfo.ANNOTATION.columnName);
+            TraceWriter.ANNOTATION_FAMILY, MetricInfo.ANNOTATION.columnName);
     }
 
     private Collection<? extends String> getDynamicCountColumns(long traceid, long parent,
@@ -195,7 +197,7 @@ public class TraceReader {
         // build the column strings, family.column<index>
         String[] parts = new String[count];
         for (int i = 0; i < count; i++) {
-            parts[i] = PhoenixMetricsSink.getDynamicColumnName(family, columnName, i);
+            parts[i] = TraceWriter.getDynamicColumnName(family, columnName, i);
         }
         // join the columns together
         String columns = comma.join(parts);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
new file mode 100644
index 0000000..3c71e27
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.Span;
+import org.apache.htrace.SpanReceiver;
+import org.apache.htrace.impl.MilliSpan;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+/**
+ * Sink for request traces ({@link SpanReceiver}) that pushes writes to {@link TraceWriter} in a
+ * format that we can more easily consume.
+ * <p>
+ * <p>
+ * Rather than write directly to a phoenix table, we drop it into the metrics queue so we can more
+ * cleanly handle it asynchronously.Currently, {@link MilliSpan} submits the span in a synchronized
+ * block to all the receivers, which could have a lot of overhead if we are submitting to multiple
+ * receivers.
+ * <p>
+ * The format of the generated metrics is this:
+ * <ol>
+ * <li>All Metrics from the same span have the same trace id (allowing correlation in the sink)</li>
+ * <li>The description of the metric describes what it contains. For instance,
+ * <ul>
+ * <li>{@link MetricInfo#PARENT} is the id of the parent of this span. (Root span is
+ * {@link Span#ROOT_SPAN_ID}).</li>
+ * <li>{@link MetricInfo#START} is the start time of the span</li>
+ * <li>{@link MetricInfo#END} is the end time of the span</li>
+ * </ul>
+ * </li>
+ * </ol>
+ * <p>
+ * <i>So why even submit to {@link TraceWriter} if we only have a single source?</i>
+ * <p>
+ * This allows us to make the updates in batches. We might have spans that finish before other spans
+ * (for instance in the same parent). By batching the updates we can lessen the overhead on the
+ * client, which is also busy doing 'real' work. <br>
+ * This class is custom implementation of metrics queue and handles batch writes to the Phoenix Table
+ * via another thread. Batch size and number of threads are configurable.
+ * <p>
+ */
+public class TraceSpanReceiver implements SpanReceiver {
+
+    private static final Log LOG = LogFactory.getLog(TraceSpanReceiver.class);
+
+    private static final int CAPACITY = QueryServicesOptions.withDefaults().getTracingTraceBufferSize();
+
+    private static BlockingQueue<Span> spanQueue = null;
+
+    public TraceSpanReceiver() {
+        this.spanQueue = new ArrayBlockingQueue<Span>(CAPACITY);
+    }
+
+    @Override
+    public void receiveSpan(Span span) {
+        if (spanQueue.offer(span)) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Span buffered to queue " + span.toJson());
+            }
+        } else if (LOG.isDebugEnabled()) {
+                LOG.debug("Span NOT buffered due to overflow in queue " + span.toJson());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        // noop
+    }
+
+    protected BlockingQueue<Span> getSpanQueue() {
+        return spanQueue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java
new file mode 100644
index 0000000..938baa2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+import static org.apache.phoenix.metrics.MetricInfo.END;
+import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
+import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.htrace.Span;
+import org.apache.htrace.TimelineAnnotation;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Sink for the trace spans pushed into the queue by {@link TraceSpanReceiver}. The class instantiates a thread pool
+ * of configurable size, which will pull the data from queue and write to the Phoenix Trace Table in batches. Various
+ * configuration options include thread pool size and batch commit size.
+ */
+public class TraceWriter {
+    private static final Log LOG = LogFactory.getLog(TraceWriter.class);
+
+    private static final String VARIABLE_VALUE = "?";
+
+    private static final Joiner COLUMN_JOIN = Joiner.on(".");
+    static final String TAG_FAMILY = "tags";
+    /**
+     * Count of the number of tags we are storing for this row
+     */
+    static final String TAG_COUNT = COLUMN_JOIN.join(TAG_FAMILY, "count");
+
+    static final String ANNOTATION_FAMILY = "annotations";
+    static final String ANNOTATION_COUNT = COLUMN_JOIN.join(ANNOTATION_FAMILY, "count");
+
+    /**
+     * Join strings on a comma
+     */
+    private static final Joiner COMMAS = Joiner.on(',');
+
+    private String tableName;
+    private int BATCH_SIZE;
+    private int NUM_THREADS;
+
+    protected BlockingQueue<Span> spanQueue;
+
+    private ScheduledExecutorService executor;
+
+    public TraceWriter(String tableName, int numThreads, int batchSize) {
+
+        this.BATCH_SIZE = batchSize;
+        this.NUM_THREADS = numThreads;
+        this.tableName = tableName;
+
+        TraceSpanReceiver traceSpanReceiver = Tracing.getTraceSpanReceiver();
+        spanQueue = traceSpanReceiver != null ? traceSpanReceiver.getSpanQueue() : null;
+        // TraceWriter should be instantiated only once, however when multiple JUnit Test runs continuously, each of them initialize their own class.
+        // To prevent them from interfering with each other, its safe to clear the queue.
+        if(spanQueue != null)
+            spanQueue.clear();
+        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+        builder.setDaemon(true).setNameFormat("PHOENIX-METRICS-WRITER");
+        executor = Executors.newScheduledThreadPool(NUM_THREADS, builder.build());
+        for (int i = 0; i < NUM_THREADS; i++) {
+            executor.scheduleAtFixedRate(new FlushMetrics(), 0, 10, TimeUnit.SECONDS);
+        }
+        LOG.info("Writing tracing metrics to phoenix table");
+    }
+
+    @VisibleForTesting
+    public void stop() {
+        try {
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+            executor.shutdownNow();
+        } catch (InterruptedException e) {
+            LOG.error("Failed to stop the thread. ", e);
+        }
+    }
+
+    public class FlushMetrics implements Runnable {
+
+        private Connection conn;
+        private int counter = 0;
+
+        public FlushMetrics() {
+            conn = getConnection(tableName);
+        }
+
+        @Override
+        public void run() {
+
+            if(conn == null) return;
+            while (!spanQueue.isEmpty()) {
+                Span span = spanQueue.poll();
+                if (null == span) break;
+                LOG.info("Span received: " + span.toJson());
+                addToBatch(span);
+                counter++;
+                if (counter >= BATCH_SIZE) {
+                    commitBatch(conn);
+                    counter = 0;
+                }
+            }
+        }
+
+        private void addToBatch(Span span) {
+
+            String stmt = "UPSERT INTO " + tableName + " (";
+            // drop it into the queue of things that should be written
+            List<String> keys = new ArrayList<String>();
+            List<Object> values = new ArrayList<Object>();
+            // we need to keep variable values in a separate set since they may have spaces, which
+            // causes the parser to barf. Instead, we need to add them after the statement is
+            // prepared
+            List<String> variableValues = new ArrayList<String>();
+            keys.add(TRACE.columnName);
+            values.add(span.getTraceId());
+
+            keys.add(DESCRIPTION.columnName);
+            values.add(VARIABLE_VALUE);
+            variableValues.add(span.getDescription());
+
+            keys.add(SPAN.traceName);
+            values.add(span.getSpanId());
+
+            keys.add(PARENT.traceName);
+            values.add(span.getParentId());
+
+            keys.add(START.traceName);
+            values.add(span.getStartTimeMillis());
+
+            keys.add(END.traceName);
+            values.add(span.getStopTimeMillis());
+
+            int annotationCount = 0;
+            int tagCount = 0;
+
+            // add the tags to the span. They were written in order received so we mark them as such
+            for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
+                addDynamicEntry(keys, values, variableValues, TAG_FAMILY, Long.toString(ta.getTime()), ta.getMessage(), TAG, tagCount);
+                tagCount++;
+            }
+
+              // add the annotations. We assume they are serialized as strings and integers, but that can
+              // change in the future
+              Map<byte[], byte[]> annotations = span.getKVAnnotations();
+              for (Map.Entry<byte[], byte[]> annotation : annotations.entrySet()) {
+                  Pair<String, String> val =
+                          TracingUtils.readAnnotation(annotation.getKey(), annotation.getValue());
+                  addDynamicEntry(keys, values, variableValues, ANNOTATION_FAMILY, val.getFirst(), val.getSecond(), ANNOTATION, annotationCount);
+                  annotationCount++;
+              }
+
+            // add the tag count, now that we know it
+            keys.add(TAG_COUNT);
+            // ignore the hostname in the tags, if we know it
+            values.add(tagCount);
+
+            keys.add(ANNOTATION_COUNT);
+            values.add(annotationCount);
+
+            // compile the statement together
+            stmt += COMMAS.join(keys);
+            stmt += ") VALUES (" + COMMAS.join(values) + ")";
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Logging metrics to phoenix table via: " + stmt);
+                LOG.trace("With tags: " + variableValues);
+            }
+            try {
+                PreparedStatement ps = conn.prepareStatement(stmt);
+                // add everything that wouldn't/may not parse
+                int index = 1;
+                for (String tag : variableValues) {
+                    ps.setString(index++, tag);
+                }
+
+                // Not going through the standard route of using statement.execute() as that code path
+                // is blocked if the metadata hasn't been been upgraded to the new minor release.
+                 MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt);
+                 MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+                 MutationState newState = plan.execute();
+                 state.join(newState);
+            } catch (SQLException e) {
+                LOG.error(
+                    "Could not write metric: \n" + span + " to prepared statement:\n" + stmt, e);
+            }
+        }
+    }
+
+    public static String getDynamicColumnName(String family, String column, int count) {
+        return COLUMN_JOIN.join(family, column) + count;
+    }
+
+    private void addDynamicEntry(List<String> keys, List<Object> values,
+            List<String> variableValues, String family, String desc, String value, MetricInfo metric,
+            int count) {
+        // <family><.dynColumn><count> <VARCHAR>
+            keys.add(getDynamicColumnName(family, metric.columnName, count) + " VARCHAR");
+
+            // build the annotation value
+            String val = desc + " - " + value;
+            values.add(VARIABLE_VALUE);
+            variableValues.add(val);
+    }
+
+    protected Connection getConnection(String tableName) {
+
+        try {
+            // create the phoenix connection
+            Properties props = new Properties();
+            props.setProperty(QueryServices.TRACING_FREQ_ATTRIB, Tracing.Frequency.NEVER.getKey());
+            Configuration conf = HBaseConfiguration.create();
+            Connection conn = QueryUtil.getConnectionOnServer(props, conf);
+
+            if (!traceTableExists(conn, tableName)) {
+                createTable(conn, tableName);
+            }
+
+            LOG.info(
+                "Created new connection for tracing " + conn.toString() + " Table: " + tableName);
+            return conn;
+        } catch (Exception e) {
+            LOG.error("New connection failed for tracing Table: " + tableName, e);
+            return null;
+        }
+    }
+
+    protected boolean traceTableExists(Connection conn, String traceTableName) throws SQLException {
+        try {
+            PhoenixRuntime.getTable(conn, traceTableName);
+            return true;
+        } catch (TableNotFoundException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Create a stats table with the given name. Stores the name for use later when creating upsert
+     * statements
+     * @param conn connection to use when creating the table
+     * @param table name of the table to create
+     * @throws SQLException if any phoenix operations fails
+     */
+    protected void createTable(Connection conn, String table) throws SQLException {
+        // only primary-key columns can be marked non-null
+        String ddl =
+                "create table if not exists " + table + "( " + TRACE.columnName
+                        + " bigint not null, " + PARENT.columnName + " bigint not null, "
+                        + SPAN.columnName + " bigint not null, " + DESCRIPTION.columnName
+                        + " varchar, " + START.columnName + " bigint, " + END.columnName
+                        + " bigint, " + HOSTNAME.columnName + " varchar, " + TAG_COUNT
+                        + " smallint, " + ANNOTATION_COUNT + " smallint"
+                        + "  CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + ", "
+                        + PARENT.columnName + ", " + SPAN.columnName + "))\n" +
+                        // We have a config parameter that can be set so that tables are
+                        // transactional by default. If that's set, we still don't want these system
+                        // tables created as transactional tables, make these table non
+                        // transactional
+                        PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
+        PreparedStatement stmt = conn.prepareStatement(ddl);
+        stmt.execute();
+    }
+
+    protected void commitBatch(Connection conn) {
+        try {
+            conn.commit();
+        } catch (SQLException e) {
+            LOG.error("Unable to commit traces on conn: " + conn.toString() + " to table: " + tableName, e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
index c9add01..94a89b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
@@ -36,7 +36,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.TraceStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.trace.TraceMetricSource;
+import org.apache.phoenix.trace.TraceSpanReceiver;
 import org.apache.htrace.Sampler;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -45,6 +45,7 @@ import org.apache.htrace.Tracer;
 import org.apache.htrace.impl.ProbabilitySampler;
 import org.apache.htrace.wrappers.TraceCallable;
 import org.apache.htrace.wrappers.TraceRunnable;
+import org.apache.phoenix.trace.TraceWriter;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -258,6 +259,7 @@ public class Tracing {
      * Track if the tracing system has been initialized for phoenix
      */
     private static boolean initialized = false;
+    private static TraceSpanReceiver traceSpanReceiver = null;
 
     /**
      * Add the phoenix span receiver so we can log the traces. We have a single trace source for the
@@ -266,7 +268,12 @@ public class Tracing {
     public synchronized static void addTraceMetricsSource() {
         try {
             if (!initialized) {
-                Trace.addReceiver(new TraceMetricSource());
+                traceSpanReceiver = new TraceSpanReceiver();
+                Trace.addReceiver(traceSpanReceiver);
+                if(QueryServicesOptions.withDefaults().isTracingEnabled()) {
+                    QueryServicesOptions options = QueryServicesOptions.withDefaults();
+                    new TraceWriter(options.getTableName(), options.getTracingThreadPoolSize(), options.getTracingBatchSize());
+                }
             }
         } catch (RuntimeException e) {
             LOG.warn("Tracing will outputs will not be written to any metrics sink! No "
@@ -281,6 +288,10 @@ public class Tracing {
         initialized = true;
     }
 
+    public static TraceSpanReceiver getTraceSpanReceiver() {
+        return traceSpanReceiver;
+    }
+
     public static boolean isTraceOn(String traceOption) {
         Preconditions.checkArgument(traceOption != null);
         if(traceOption.equalsIgnoreCase("ON")) return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/118b932f/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java b/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java
index eabcaca..7f307da 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java
@@ -20,24 +20,18 @@ package org.apache.phoenix.trace;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.metrics2.MetricsCollector;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.htrace.Span;
 import org.apache.htrace.impl.MilliSpan;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 /**
- * Test that the @{link TraceMetricSource} correctly handles different kinds of traces
+ * Test that the @{link TraceSpanReceiver} correctly handles different kinds of traces
  */
 public class TraceMetricsSourceTest {
 
   @BeforeClass
   public static void setup() throws Exception{
-    DefaultMetricsSystem.setMiniClusterMode(true);
   }
 
   /**
@@ -48,16 +42,19 @@ public class TraceMetricsSourceTest {
   public void testNonIntegerAnnotations(){
     Span span = getSpan();
     // make sure its less than the length of an integer
+
     byte[] value = Bytes.toBytes("a");
     byte[] someInt = Bytes.toBytes(1);
-    assertTrue(someInt.length >value.length);
+    assertTrue(someInt.length > value.length);
 
     // an annotation that is not an integer
     span.addKVAnnotation(Bytes.toBytes("key"), value);
 
     // Create the sink and write the span
-    TraceMetricSource source = new TraceMetricSource();
+    TraceSpanReceiver source = new TraceSpanReceiver();
     source.receiveSpan(span);
+
+    assertTrue(source.getSpanQueue().size() == 1);
   }
 
   @Test
@@ -67,29 +64,13 @@ public class TraceMetricsSourceTest {
     // add annotation through the phoenix interfaces
     TracingUtils.addAnnotation(span, "message", 10);
 
-    TraceMetricSource source = new TraceMetricSource();
+    TraceSpanReceiver source = new TraceSpanReceiver();
     source.receiveSpan(span);
-  }
-
-  /**
-   * If the source does not write any metrics when there are no spans, i.e. when initialized,
-   * then the metrics system will discard the source, so it needs to always emit some metrics.
-   */
-  @Test
-  public void testWritesInfoWhenNoSpans(){
-    TraceMetricSource source = new TraceMetricSource();
-    MetricsCollector collector = Mockito.mock(MetricsCollector.class);
-    MetricsRecordBuilder builder = Mockito.mock(MetricsRecordBuilder.class);
-    Mockito.when(collector.addRecord(Mockito.anyString())).thenReturn(builder);
-
-    source.getMetrics(collector, true);
 
-    // verify that we add a record and that the record has some info
-    Mockito.verify(collector).addRecord(Mockito.anyString());
-    Mockito.verify(builder).add(Mockito.any(MetricsTag.class));
+    assertTrue(source.getSpanQueue().size() == 1);
   }
 
   private Span getSpan(){
     return new MilliSpan("test span", 0, 1 , 2, "pid");
   }
-}
\ No newline at end of file
+}


Mime
View raw message