phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject phoenix git commit: PHOENIX-3896 Fix test failures related to tracing changes (Karan Mehta)
Date Mon, 05 Jun 2017 18:52:36 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 9557da2be -> e838afe4d


PHOENIX-3896 Fix test failures related to tracing changes (Karan Mehta)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e838afe4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e838afe4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e838afe4

Branch: refs/heads/4.x-HBase-1.2
Commit: e838afe4daa2a97faf68280cdacb595461b408c5
Parents: 9557da2
Author: Samarth Jain <samarth@apache.org>
Authored: Mon Jun 5 11:52:30 2017 -0700
Committer: Samarth Jain <samarth@apache.org>
Committed: Mon Jun 5 11:52:30 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/trace/BaseTracingTestIT.java | 130 ++++++++-----------
 .../trace/PhoenixTableMetricsWriterIT.java      |  12 +-
 .../phoenix/trace/PhoenixTracingEndToEndIT.java |  61 +++++----
 .../apache/phoenix/trace/TraceSpanReceiver.java |  15 ++-
 .../org/apache/phoenix/trace/TraceWriter.java   | 128 +++++++++---------
 .../org/apache/phoenix/trace/util/Tracing.java  |   9 +-
 .../phoenix/trace/TraceMetricsSourceTest.java   |  77 -----------
 7 files changed, 174 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e838afe4/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
index e3a7510..1b919fb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
@@ -23,39 +23,56 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.metrics2.AbstractMetric;
-import org.apache.hadoop.metrics2.MetricsInfo;
-import org.apache.hadoop.metrics2.MetricsRecord;
-import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.hadoop.metrics2.impl.ExposedMetricCounterLong;
-import org.apache.hadoop.metrics2.impl.ExposedMetricsRecordImpl;
-import org.apache.hadoop.metrics2.lib.ExposedMetricsInfoImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
 import org.apache.htrace.impl.MilliSpan;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
-import org.apache.phoenix.metrics.MetricInfo;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.trace.util.Tracing.Frequency;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.After;
+import org.junit.Before;
 
 /**
- * Base test for tracing tests - helps manage getting tracing/non-tracing
- * connections, as well as any supporting utils.
+ * Base test for tracing tests - helps manage getting tracing/non-tracing connections, as
well as
+ * any supporting utils.
  */
 
 public class BaseTracingTestIT extends ParallelStatsDisabledIT {
 
+    private static final Log LOG = LogFactory.getLog(BaseTracingTestIT.class);
+
     protected CountDownLatch latch;
     protected int defaultTracingThreadPoolForTest = 1;
     protected int defaultTracingBatchSizeForTest = 1;
+    protected String tracingTableName;
+    protected TraceSpanReceiver traceSpanReceiver = null;
+    protected TestTraceWriter testTraceWriter = null;
+
+    @Before
+    public void setup() {
+        tracingTableName = "TRACING_" + generateUniqueName();
+        traceSpanReceiver = new TraceSpanReceiver();
+        Trace.addReceiver(traceSpanReceiver);
+        testTraceWriter =
+                new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest,
+                        defaultTracingBatchSizeForTest);
+    }
+
+    @After
+    public void cleanUp() {
+        Trace.removeReceiver(traceSpanReceiver);
+        if (testTraceWriter != null) testTraceWriter.stop();
+    }
 
     public static Connection getConnectionWithoutTracing() throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -68,7 +85,7 @@ public class BaseTracingTestIT extends ParallelStatsDisabledIT {
     }
 
     public static Connection getTracingConnection() throws Exception {
-        return getTracingConnection(Collections.<String, String>emptyMap(), null);
+        return getTracingConnection(Collections.<String, String> emptyMap(), null);
     }
 
     public static Connection getTracingConnection(Map<String, String> customAnnotations,
@@ -89,73 +106,21 @@ public class BaseTracingTestIT extends ParallelStatsDisabledIT {
         return DriverManager.getConnection(getUrl(), props);
     }
 
-    public static MetricsRecord createRecord(long traceid, long parentid, long spanid,
-            String desc, long startTime, long endTime, String hostname, String... tags) {
-
-        List<AbstractMetric> metrics = new ArrayList<AbstractMetric>();
-        AbstractMetric span = new ExposedMetricCounterLong(asInfo(MetricInfo
-                .SPAN.traceName),
-                spanid);
-        metrics.add(span);
-
-        AbstractMetric parent = new ExposedMetricCounterLong(asInfo(MetricInfo.PARENT.traceName),
-                parentid);
-        metrics.add(parent);
-
-        AbstractMetric start = new ExposedMetricCounterLong(asInfo(MetricInfo.START.traceName),
-                startTime);
-        metrics.add(start);
-
-        AbstractMetric
-                end =
-                new ExposedMetricCounterLong(asInfo(MetricInfo.END.traceName), endTime);
-        metrics.add(end);
-
-        List<MetricsTag> tagsList = new ArrayList<MetricsTag>();
-        int tagCount = 0;
-        for (String annotation : tags) {
-            MetricsTag tag =
-                    new PhoenixTagImpl(MetricInfo.ANNOTATION.traceName,
-                            Integer.toString(tagCount++), annotation);
-            tagsList.add(tag);
-        }
-        String hostnameValue = "host-name.value";
-        MetricsTag hostnameTag =
-                new PhoenixTagImpl(MetricInfo.HOSTNAME.traceName, "", hostnameValue);
-        tagsList.add(hostnameTag);
-
-        MetricsRecord record =
-                new ExposedMetricsRecordImpl(new ExposedMetricsInfoImpl(TracingUtils
-                        .getTraceMetricName(traceid), desc), System.currentTimeMillis(),
-                        tagsList, metrics);
-        return record;
-    }
-
-    private static MetricsInfo asInfo(String name) {
-        return new ExposedMetricsInfoImpl(name, "");
-    }
-
     protected Span createNewSpan(long traceid, long parentid, long spanid, String description,
             long startTime, long endTime, String processid, String... tags) {
 
-        Span span = new MilliSpan.Builder()
-                .description(description)
-                .traceId(traceid)
-                .parents(new long[] {parentid})
-                .spanId(spanid)
-                .processId(processid)
-                .begin(startTime)
-                .end(endTime)
-                .build();
+        Span span =
+                new MilliSpan.Builder().description(description).traceId(traceid)
+                        .parents(new long[] { parentid }).spanId(spanid).processId(processid)
+                        .begin(startTime).end(endTime).build();
 
         int tagCount = 0;
-        for(String annotation : tags) {
+        for (String annotation : tags) {
             span.addKVAnnotation((Integer.toString(tagCount++)).getBytes(), annotation.getBytes());
         }
         return span;
     }
 
-
     private static class CountDownConnection extends DelegateConnection {
         private CountDownLatch commit;
 
@@ -181,16 +146,33 @@ public class BaseTracingTestIT extends ParallelStatsDisabledIT {
         @Override
         protected Connection getConnection(String tableName) {
             try {
-                Connection connection = new CountDownConnection(getConnectionWithoutTracing(),
latch);
-                if(!traceTableExists(connection, tableName)) {
+                Connection connection =
+                        new CountDownConnection(getConnectionWithoutTracing(), latch);
+                if (!traceTableExists(connection, tableName)) {
                     createTable(connection, tableName);
                 }
                 return connection;
             } catch (SQLException e) {
-                e.printStackTrace();
+                LOG.error("New connection failed for tracing Table: " + tableName, e);
+                return null;
             }
-            return null;
         }
+
+        @Override
+        protected TraceSpanReceiver getTraceSpanReceiver() {
+            return traceSpanReceiver;
+        }
+
+        public void stop() {
+            if (executor == null) return;
+            try {
+                executor.shutdownNow();
+                executor.awaitTermination(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOG.error("Failed to stop the thread. ", e);
+            }
+        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e838afe4/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
index 88ab6ff..c753c7a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.htrace.Span;
+import org.apache.htrace.Tracer;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.trace.TraceReader.SpanInfo;
 import org.apache.phoenix.trace.TraceReader.TraceHolder;
@@ -37,7 +38,6 @@ import org.junit.Test;
  */
 public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
 
-    private TestTraceWriter testTraceWriter;
     /**
      * IT should create the target table if it hasn't been created yet, but not fail if the
table
      * has already been created
@@ -46,7 +46,6 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
     @Test
     public void testCreatesTable() throws Exception {
 
-        testTraceWriter = new TestTraceWriter(generateUniqueName(), defaultTracingThreadPoolForTest,
defaultTracingBatchSizeForTest);
         Connection conn = getConnectionWithoutTracing();
 
         // check for existence of the tracing table
@@ -58,7 +57,6 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
         } catch (Exception e) {
             // expected
         }
-
     }
 
     /**
@@ -70,10 +68,8 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
     public void writeMetrics() throws Exception {
 
         Connection conn = getConnectionWithoutTracing();
-        String tableName = generateUniqueName();
-        TraceSpanReceiver traceSpanReceiver = new TraceSpanReceiver();
         latch = new CountDownLatch(1);
-        testTraceWriter = new TestTraceWriter(tableName, defaultTracingThreadPoolForTest,
defaultTracingBatchSizeForTest);
+        testTraceWriter.start();
 
         // create a simple metrics record
         long traceid = 987654;
@@ -88,12 +84,12 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
         Span span = createNewSpan(traceid, parentid, spanid, description, startTime, endTime,
             processid, annotation);
 
-        traceSpanReceiver.getSpanQueue().add(span);
+        Tracer.getInstance().deliver(span);
         assertTrue("Span never committed to table", latch.await(30, TimeUnit.SECONDS));
 
         // make sure we only get expected stat entry (matcing the trace id), otherwise we
could the
         // stats for the update as well
-        TraceReader reader = new TraceReader(conn, tableName);
+        TraceReader reader = new TraceReader(conn, tracingTableName);
         Collection<TraceHolder> traces = reader.readAll(10);
         assertEquals("Wrong number of traces in the tracing table", 1, traces.size());
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e838afe4/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
index 99c1f34..03510dc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
@@ -34,20 +34,13 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Span;
-import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
+import org.apache.htrace.*;
 import org.apache.htrace.impl.ProbabilitySampler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.trace.TraceReader.SpanInfo;
 import org.apache.phoenix.trace.TraceReader.TraceHolder;
-import org.apache.phoenix.trace.util.Tracing;
-import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableMap;
@@ -62,22 +55,13 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
     private static final int MAX_RETRIES = 10;
     private String enabledForLoggingTable;
     private String enableForLoggingIndex;
-    private String tracingTableName;
-    private TestTraceWriter testTraceWriter = null;
 
     @Before
     public void setupMetrics() throws Exception {
-        tracingTableName = "TRACING_" + generateUniqueName();
         enabledForLoggingTable = "ENABLED_FOR_LOGGING_" + generateUniqueName();
         enableForLoggingIndex = "ENABALED_FOR_LOGGING_INDEX_" + generateUniqueName();
     }
 
-    @After
-    public void cleanUp() {
-        if(testTraceWriter != null)
-            testTraceWriter.stop();
-    }
-
     /**
      * Simple test that we can correctly write spans to the phoenix table
      * @throws Exception on failure
@@ -85,10 +69,11 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
     @Test
     public void testWriteSpans() throws Exception {
 
+        LOG.info("testWriteSpans TableName: " + tracingTableName);
         // watch our sink so we know when commits happen
         latch = new CountDownLatch(1);
 
-        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest,
defaultTracingBatchSizeForTest);
+        testTraceWriter.start();
 
         // write some spans
         TraceScope trace = Trace.startSpan("Start write test", Sampler.ALWAYS);
@@ -106,7 +91,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         trace.close();
 
         // pass the trace on
-        Tracing.getTraceSpanReceiver().receiveSpan(span);
+        Tracer.getInstance().deliver(span);
 
         // wait for the tracer to actually do the write
         assertTrue("Sink not flushed. commit() not called on the connection", latch.await(60,
TimeUnit.SECONDS));
@@ -148,9 +133,10 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
     @Test
     public void testClientServerIndexingTracing() throws Exception {
 
+        LOG.info("testClientServerIndexingTracing TableName: " + tracingTableName);
         // one call for client side, one call for server side
         latch = new CountDownLatch(2);
-        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest,
defaultTracingBatchSizeForTest);
+        testTraceWriter.start();
 
         // separate connection so we don't create extra traces
         Connection conn = getConnectionWithoutTracing();
@@ -225,13 +211,16 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
 
     @Test
     public void testScanTracing() throws Exception {
+
+        LOG.info("testScanTracing TableName: " + tracingTableName);
+
         // separate connections to minimize amount of traces that are generated
         Connection traceable = getTracingConnection();
         Connection conn = getConnectionWithoutTracing();
 
         // one call for client side, one call for server side
         latch = new CountDownLatch(2);
-        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest,
defaultTracingBatchSizeForTest);
+        testTraceWriter.start();
 
         // create a dummy table
         createTestTable(conn, false);
@@ -275,13 +264,16 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
 
     @Test
     public void testScanTracingOnServer() throws Exception {
+
+        LOG.info("testScanTracingOnServer TableName: " + tracingTableName);
+
         // separate connections to minimize amount of traces that are generated
         Connection traceable = getTracingConnection();
         Connection conn = getConnectionWithoutTracing();
 
         // one call for client side, one call for server side
         latch = new CountDownLatch(5);
-        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest,
defaultTracingBatchSizeForTest);
+        testTraceWriter.start();
 
         // create a dummy table
         createTestTable(conn, false);
@@ -324,6 +316,9 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
 
     @Test
     public void testCustomAnnotationTracing() throws Exception {
+
+        LOG.info("testCustomAnnotationTracing TableName: " + tracingTableName);
+
     	final String customAnnotationKey = "myannot";
     	final String customAnnotationValue = "a1";
     	final String tenantId = "tenant1";
@@ -333,7 +328,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
 
         // one call for client side, one call for server side
         latch = new CountDownLatch(2);
-        testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest,
defaultTracingBatchSizeForTest);
+        testTraceWriter.start();
 
         // create a dummy table
         createTestTable(conn, false);
@@ -425,21 +420,22 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
     @Test
     public void testSingleSpan() throws Exception {
 
+        LOG.info("testSingleSpan TableName: " + tracingTableName);
+
         Properties props = new Properties(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
-        String tableName = generateUniqueName();
         latch = new CountDownLatch(1);
-        testTraceWriter = new TestTraceWriter(tableName, defaultTracingThreadPoolForTest,
defaultTracingBatchSizeForTest);
+        testTraceWriter.start();
 
         // create a simple metrics record
         long traceid = 987654;
         Span span = createNewSpan(traceid, Span.ROOT_SPAN_ID, 10, "root", 12, 13, "Some process",
"test annotation for a span");
 
-        Tracing.getTraceSpanReceiver().receiveSpan(span);
+        Tracer.getInstance().deliver(span);
         assertTrue("Updates not written in table", latch.await(60, TimeUnit.SECONDS));
 
         // start a reader
-        validateTraces(Collections.singletonList(span), conn, traceid, tableName);
+        validateTraces(Collections.singletonList(span), conn, traceid, tracingTableName);
     }
 
     /**
@@ -450,10 +446,11 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
     @Test
     public void testMultipleSpans() throws Exception {
 
+        LOG.info("testMultipleSpans TableName: " + tracingTableName);
+
         Connection conn = getConnectionWithoutTracing();
-        String tableName = generateUniqueName();
         latch = new CountDownLatch(4);
-        testTraceWriter = new TestTraceWriter(tableName, defaultTracingThreadPoolForTest,
defaultTracingBatchSizeForTest);
+        testTraceWriter.start();
 
         // create a simple metrics record
         long traceid = 12345;
@@ -483,12 +480,12 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         spans.add(span);
 
         for(Span span1 : spans)
-            Tracing.getTraceSpanReceiver().receiveSpan(span1);
+            Tracer.getInstance().deliver(span1);
 
         assertTrue("Updates not written in table", latch.await(100, TimeUnit.SECONDS));
 
         // start a reader
-        validateTraces(spans, conn, traceid, tableName);
+        validateTraces(spans, conn, traceid, tracingTableName);
     }
 
     private void validateTraces(List<Span> spans, Connection conn, long traceid, String
tableName)
@@ -583,4 +580,4 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e838afe4/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
index fc58c5e..122ae28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.htrace.Span;
@@ -67,7 +68,7 @@ public class TraceSpanReceiver implements SpanReceiver {
 
     private static final int CAPACITY = QueryServicesOptions.withDefaults().getTracingTraceBufferSize();
 
-    private static BlockingQueue<Span> spanQueue = null;
+    private BlockingQueue<Span> spanQueue = null;
 
     public TraceSpanReceiver() {
         this.spanQueue = new ArrayBlockingQueue<Span>(CAPACITY);
@@ -89,7 +90,15 @@ public class TraceSpanReceiver implements SpanReceiver {
         // noop
     }
 
-    protected BlockingQueue<Span> getSpanQueue() {
-        return spanQueue;
+    boolean isSpanAvailable() {
+        return spanQueue.isEmpty();
+    }
+
+    Span getSpan() {
+        return spanQueue.poll();
+    }
+
+    int getNumSpans() {
+        return spanQueue.size();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e838afe4/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java
index 938baa2..e823359 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java
@@ -34,8 +34,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -64,9 +62,10 @@ import com.google.common.base.Joiner;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * Sink for the trace spans pushed into the queue by {@link TraceSpanReceiver}. The class
instantiates a thread pool
- * of configurable size, which will pull the data from queue and write to the Phoenix Trace
Table in batches. Various
- * configuration options include thread pool size and batch commit size.
+ * Sink for the trace spans pushed into the queue by {@link TraceSpanReceiver}. The class
+ * instantiates a thread pool of configurable size, which will pull the data from queue and
write to
+ * the Phoenix Trace Table in batches. Various configuration options include thread pool
size and
+ * batch commit size.
  */
 public class TraceWriter {
     private static final Log LOG = LogFactory.getLog(TraceWriter.class);
@@ -89,42 +88,43 @@ public class TraceWriter {
     private static final Joiner COMMAS = Joiner.on(',');
 
     private String tableName;
-    private int BATCH_SIZE;
-    private int NUM_THREADS;
+    private int batchSize;
+    private int numThreads;
+    private TraceSpanReceiver traceSpanReceiver;
 
-    protected BlockingQueue<Span> spanQueue;
-
-    private ScheduledExecutorService executor;
+    protected ScheduledExecutorService executor;
 
     public TraceWriter(String tableName, int numThreads, int batchSize) {
 
-        this.BATCH_SIZE = batchSize;
-        this.NUM_THREADS = numThreads;
+        this.batchSize = batchSize;
+        this.numThreads = numThreads;
         this.tableName = tableName;
+    }
+
+    public void start() {
+
+        traceSpanReceiver = getTraceSpanReceiver();
+        if (traceSpanReceiver == null) {
+            LOG.warn(
+                "No receiver has been initialized for TraceWriter. Traces will not be written.");
+            LOG.warn("Restart Phoenix to try again.");
+            return;
+        }
 
-        TraceSpanReceiver traceSpanReceiver = Tracing.getTraceSpanReceiver();
-        spanQueue = traceSpanReceiver != null ? traceSpanReceiver.getSpanQueue() : null;
-        // TraceWriter should be instantiated only once, however when multiple JUnit Test
runs continuously, each of them initialize their own class.
-        // To prevent them from interfering with each other, its safe to clear the queue.
-        if(spanQueue != null)
-            spanQueue.clear();
         ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
         builder.setDaemon(true).setNameFormat("PHOENIX-METRICS-WRITER");
-        executor = Executors.newScheduledThreadPool(NUM_THREADS, builder.build());
-        for (int i = 0; i < NUM_THREADS; i++) {
+        executor = Executors.newScheduledThreadPool(this.numThreads, builder.build());
+
+        for (int i = 0; i < this.numThreads; i++) {
             executor.scheduleAtFixedRate(new FlushMetrics(), 0, 10, TimeUnit.SECONDS);
         }
+
         LOG.info("Writing tracing metrics to phoenix table");
     }
 
     @VisibleForTesting
-    public void stop() {
-        try {
-            executor.awaitTermination(5, TimeUnit.SECONDS);
-            executor.shutdownNow();
-        } catch (InterruptedException e) {
-            LOG.error("Failed to stop the thread. ", e);
-        }
+    protected TraceSpanReceiver getTraceSpanReceiver() {
+        return Tracing.getTraceSpanReceiver();
     }
 
     public class FlushMetrics implements Runnable {
@@ -138,15 +138,16 @@ public class TraceWriter {
 
         @Override
         public void run() {
-
-            if(conn == null) return;
-            while (!spanQueue.isEmpty()) {
-                Span span = spanQueue.poll();
+            if (conn == null) return;
+            while (!traceSpanReceiver.isSpanAvailable()) {
+                Span span = traceSpanReceiver.getSpan();
                 if (null == span) break;
-                LOG.info("Span received: " + span.toJson());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Span received: " + span.toJson());
+                }
                 addToBatch(span);
                 counter++;
-                if (counter >= BATCH_SIZE) {
+                if (counter >= batchSize) {
                     commitBatch(conn);
                     counter = 0;
                 }
@@ -187,19 +188,22 @@ public class TraceWriter {
 
             // add the tags to the span. They were written in order received so we mark them
as such
             for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
-                addDynamicEntry(keys, values, variableValues, TAG_FAMILY, Long.toString(ta.getTime()),
ta.getMessage(), TAG, tagCount);
+                addDynamicEntry(keys, values, variableValues, TAG_FAMILY,
+                    Long.toString(ta.getTime()), ta.getMessage(), TAG, tagCount);
                 tagCount++;
             }
 
-              // add the annotations. We assume they are serialized as strings and integers,
but that can
-              // change in the future
-              Map<byte[], byte[]> annotations = span.getKVAnnotations();
-              for (Map.Entry<byte[], byte[]> annotation : annotations.entrySet()) {
-                  Pair<String, String> val =
-                          TracingUtils.readAnnotation(annotation.getKey(), annotation.getValue());
-                  addDynamicEntry(keys, values, variableValues, ANNOTATION_FAMILY, val.getFirst(),
val.getSecond(), ANNOTATION, annotationCount);
-                  annotationCount++;
-              }
+            // add the annotations. We assume they are serialized as strings and integers,
but that
+            // can
+            // change in the future
+            Map<byte[], byte[]> annotations = span.getKVAnnotations();
+            for (Map.Entry<byte[], byte[]> annotation : annotations.entrySet()) {
+                Pair<String, String> val =
+                        TracingUtils.readAnnotation(annotation.getKey(), annotation.getValue());
+                addDynamicEntry(keys, values, variableValues, ANNOTATION_FAMILY, val.getFirst(),
+                    val.getSecond(), ANNOTATION, annotationCount);
+                annotationCount++;
+            }
 
             // add the tag count, now that we know it
             keys.add(TAG_COUNT);
@@ -225,15 +229,16 @@ public class TraceWriter {
                     ps.setString(index++, tag);
                 }
 
-                // Not going through the standard route of using statement.execute() as that
code path
+                // Not going through the standard route of using statement.execute() as that
code
+                // path
                 // is blocked if the metadata hasn't been been upgraded to the new minor
release.
-                 MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt);
-                 MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
-                 MutationState newState = plan.execute();
-                 state.join(newState);
+                MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt);
+                MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+                MutationState newState = plan.execute();
+                state.join(newState);
             } catch (SQLException e) {
-                LOG.error(
-                    "Could not write metric: \n" + span + " to prepared statement:\n" + stmt,
e);
+                LOG.error("Could not write metric: \n" + span + " to prepared statement:\n"
+ stmt,
+                    e);
             }
         }
     }
@@ -243,15 +248,15 @@ public class TraceWriter {
     }
 
     private void addDynamicEntry(List<String> keys, List<Object> values,
-            List<String> variableValues, String family, String desc, String value,
MetricInfo metric,
-            int count) {
+            List<String> variableValues, String family, String desc, String value,
+            MetricInfo metric, int count) {
         // <family><.dynColumn><count> <VARCHAR>
-            keys.add(getDynamicColumnName(family, metric.columnName, count) + " VARCHAR");
+        keys.add(getDynamicColumnName(family, metric.columnName, count) + " VARCHAR");
 
-            // build the annotation value
-            String val = desc + " - " + value;
-            values.add(VARIABLE_VALUE);
-            variableValues.add(val);
+        // build the annotation value
+        String val = desc + " - " + value;
+        values.add(VARIABLE_VALUE);
+        variableValues.add(val);
     }
 
     protected Connection getConnection(String tableName) {
@@ -271,7 +276,10 @@ public class TraceWriter {
                 "Created new connection for tracing " + conn.toString() + " Table: " + tableName);
             return conn;
         } catch (Exception e) {
-            LOG.error("New connection failed for tracing Table: " + tableName, e);
+            LOG.error("Tracing will NOT be pursued. New connection failed for tracing Table:
"
+                    + tableName,
+                e);
+            LOG.error("Restart Phoenix to retry.");
             return null;
         }
     }
@@ -316,7 +324,9 @@ public class TraceWriter {
         try {
             conn.commit();
         } catch (SQLException e) {
-            LOG.error("Unable to commit traces on conn: " + conn.toString() + " to table:
" + tableName, e);
+            LOG.error(
+                "Unable to commit traces on conn: " + conn.toString() + " to table: " + tableName,
+                e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e838afe4/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
index 94a89b8..35cc6dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
@@ -267,13 +267,12 @@ public class Tracing {
      */
     public synchronized static void addTraceMetricsSource() {
         try {
-            if (!initialized) {
+            QueryServicesOptions options = QueryServicesOptions.withDefaults();
+            if (!initialized && options.isTracingEnabled()) {
                 traceSpanReceiver = new TraceSpanReceiver();
                 Trace.addReceiver(traceSpanReceiver);
-                if(QueryServicesOptions.withDefaults().isTracingEnabled()) {
-                    QueryServicesOptions options = QueryServicesOptions.withDefaults();
-                    new TraceWriter(options.getTableName(), options.getTracingThreadPoolSize(),
options.getTracingBatchSize());
-                }
+                TraceWriter traceWriter = new TraceWriter(options.getTableName(), options.getTracingThreadPoolSize(),
options.getTracingBatchSize());
+                traceWriter.start();
             }
         } catch (RuntimeException e) {
             LOG.warn("Tracing will outputs will not be written to any metrics sink! No "

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e838afe4/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java
deleted file mode 100644
index 37cea88..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.trace;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.htrace.Span;
-import org.apache.htrace.impl.MilliSpan;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Test that the @{link TraceSpanReceiver} correctly handles different kinds of traces
- */
-public class TraceMetricsSourceTest {
-
-  @BeforeClass
-  public static void setup() throws Exception{
-  }
-
-  /**
-   * For PHOENIX-1126, Phoenix originally assumed all the annotation values were integers,
-   * but HBase writes some strings as well, so we need to be able to handle that too
-   */
-  @Test
-  public void testNonIntegerAnnotations(){
-    Span span = getSpan();
-    // make sure its less than the length of an integer
-
-    byte[] value = Bytes.toBytes("a");
-    byte[] someInt = Bytes.toBytes(1);
-    assertTrue(someInt.length > value.length);
-
-    // an annotation that is not an integer
-    span.addKVAnnotation(Bytes.toBytes("key"), value);
-
-    // Create the sink and write the span
-    TraceSpanReceiver source = new TraceSpanReceiver();
-    source.receiveSpan(span);
-
-    assertTrue(source.getSpanQueue().size() == 1);
-  }
-
-  @Test
-  public void testIntegerAnnotations(){
-    Span span = getSpan();
-
-    // add annotation through the phoenix interfaces
-    TracingUtils.addAnnotation(span, "message", 10);
-
-    TraceSpanReceiver source = new TraceSpanReceiver();
-    source.receiveSpan(span);
-
-    assertTrue(source.getSpanQueue().size() == 1);
-  }
-
-  private Span getSpan(){
-    // Spans with Trace Id as 0 will be rejected (See PHOENIX-3767 for details)
-    return new MilliSpan("test span", 1, 1 , 2, "pid");
-  }
-}


Mime
View raw message