htrace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [09/12] incubator-htrace git commit: adding review changes according to Colin
Date Wed, 11 Apr 2018 20:26:59 GMT
adding review changes according to Colin


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/0cd394f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/0cd394f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/0cd394f5

Branch: refs/heads/master
Commit: 0cd394f5788b5738dde7169f62e94ab2c8e2b31e
Parents: 8ccbb89
Author: Nisala Nirmana <nisala12@gmail.com>
Authored: Sat Aug 20 00:04:01 2016 +0530
Committer: Nisala Nirmana <nisala12@gmail.com>
Committed: Sat Aug 20 00:04:01 2016 +0530

----------------------------------------------------------------------
 .../htrace/impl/KuduClientConfiguration.java    |   6 +-
 .../htrace/impl/KuduReceiverConstants.java      |  22 +-
 .../apache/htrace/impl/KuduSpanReceiver.java    | 267 +++++--------------
 .../htrace/impl/TestKuduSpanReceiver.java       |  28 +-
 .../apache/htrace/impl/TestKuduSpanViewer.java  | 237 ++++++++++++++++
 5 files changed, 333 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
----------------------------------------------------------------------
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
index 25d6fc4..c13e7b4 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
@@ -23,7 +23,7 @@ import org.kududb.client.KuduClient.KuduClientBuilder;
 public class KuduClientConfiguration {
 
   private final String host;
-  private final String port;
+  private final Integer port;
   private final Integer workerCount;
   private final Integer bossCount;
   private final Boolean isStatisticsEnabled;
@@ -32,7 +32,7 @@ public class KuduClientConfiguration {
   private final Long socketReadTimeout;
 
   public KuduClientConfiguration(String host,
-                                 String port,
+                                 Integer port,
                                  Integer workerCount,
                                  Integer bossCount,
                                  Boolean isStatisticsEnabled,
@@ -52,7 +52,7 @@ public class KuduClientConfiguration {
 
   public KuduClient buildClient() {
     KuduClientBuilder builder = new KuduClient
-            .KuduClientBuilder(host.concat(":").concat(port));
+            .KuduClientBuilder(host.concat(":").concat(port.toString()));
     if (workerCount != null) {
       builder.workerCount(workerCount);
     }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
----------------------------------------------------------------------
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
index be98311..805ec80 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
@@ -23,39 +23,21 @@ public class KuduReceiverConstants {
   static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1";
   static final String KUDU_MASTER_PORT_KEY = "kudu.master.port";
   static final String DEFAULT_KUDU_MASTER_PORT = "7051";
-  static final String SPAN_BLOCKING_QUEUE_SIZE_KEY = "kudu.span.queue.size";
-  static final int DEFAULT_SPAN_BLOCKING_QUEUE_SIZE = 1000;
   static final String KUDU_SPAN_TABLE_KEY = "kudu.span.table";
   static final String DEFAULT_KUDU_SPAN_TABLE = "span";
   static final String KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY = "kudu.span.timeline.annotation.table";
   static final String DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE = "span.timeline";
-  static final String MAX_SPAN_BATCH_SIZE_KEY = "kudu.batch.size";
-  static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100;
-  static final String NUM_PARALLEL_THREADS_KEY = "kudu.num.threads";
-  static final int DEFAULT_NUM_PARALLEL_THREADS = 1;
-  static final String KUDU_COLUMN_SPAN_TRACE_ID_KEY = "kudu.column.span.traceid";
   static final String DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID = "trace_id";
-  static final String KUDU_COLUMN_SPAN_START_TIME_KEY = "kudu.column.span.starttime";
   static final String DEFAULT_KUDU_COLUMN_SPAN_START_TIME = "start_time";
-  static final String KUDU_COLUMN_SPAN_STOP_TIME_KEY = "kudu.column.span.stoptime";
   static final String DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME = "stop_time";
-  static final String KUDU_COLUMN_SPAN_SPAN_ID_KEY = "kudu.column.span.spanid";
   static final String DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID = "span_id";
-  static final String KUDU_COLUMN_SPAN_PROCESS_ID_KEY = "kudu.column.span.processid";
-  static final String DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID = "process_id";
-  static final String KUDU_COLUMN_SPAN_PARENT_ID_KEY = "kudu.column.span.parentid";
-  static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID = "parent_id";
-  static final String KUDU_COLUMN_SPAN_DESCRIPTION_KEY = "kudu.column.span.description";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW = "parent_id_low";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH = "parent_id_high";
   static final String DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION = "description";
-  static final String KUDU_COLUMN_SPAN_PARENT_KEY = "kudu.column.span.parent";
   static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT = "parent";
-  static final String KUDU_COLUMN_TIMELINE_TIME_KEY = "kudu.column.timeline.time";
   static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIME = "time";
-  static final String KUDU_COLUMN_TIMELINE_MESSAGE_KEY = "kudu.column.timeline.message";
   static final String DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE = "message";
-  static final String KUDU_COLUMN_TIMELINE_SPANID_KEY = "kudu.column.timeline.spanid";
   static final String DEFAULT_KUDU_COLUMN_TIMELINE_SPANID = "spanid";
-  static final String KUDU_COLUMN_TIMELINE_TIMELINEID_KEY = "kudu.column.timeline.timelineid";
   static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID = "timelineid";
   static final String KUDU_CLIENT_WORKER_COUNT_KEY = "kudu.client.worker.count";
   static final String KUDU_CLIENT_BOSS_COUNT_KEY = "kudu.client.boss.count";

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
index 46c324a..745f24d 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
@@ -30,49 +30,22 @@ import org.kududb.client.Insert;
 import org.kududb.client.PartialRow;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 public class KuduSpanReceiver extends SpanReceiver {
 
   private static final Log LOG = LogFactory.getLog(KuduSpanReceiver.class);
 
-  private static final int SHUTDOWN_TIMEOUT = 30;
-  private static final int MAX_ERRORS = 10;
-  private final BlockingQueue<Span> queue;
-  private final AtomicBoolean running = new AtomicBoolean(true);
   private final KuduClientConfiguration clientConf;
-  private final int maxSpanBatchSize;
-  private final ThreadFactory threadFactory = new ThreadFactory() {
-    private final AtomicLong receiverIndex = new AtomicLong(0);
-
-    @Override
-    public Thread newThread(Runnable runnable) {
-      Thread thread = new Thread(runnable);
-      thread.setDaemon(true);
-      thread.setName(String.format("kuduSpanReceiver-%d",
-              receiverIndex.getAndIncrement()));
-      return thread;
-    }
-  };
-
-  private ExecutorService service;
+  private KuduSession session;
+  private KuduClient client;
 
   private String table_span;
   private String column_span_trace_id;
   private String column_span_start_time;
   private String column_span_stop_time;
   private String column_span_span_id;
-  private String column_span_process_id;
-  private String column_span_parent_id;
+  private String column_span_parent_id_low;
+  private String column_span_parent_id_high;
   private String column_span_description;
   private String column_span_parent;
 
@@ -85,7 +58,7 @@ public class KuduSpanReceiver extends SpanReceiver {
   public KuduSpanReceiver(HTraceConfiguration conf) {
 
     String masterHost;
-    String masterPort;
+    Integer masterPort;
     Integer workerCount;
     Integer bossCount;
     Boolean isStatisticsEnabled;
@@ -95,8 +68,8 @@ public class KuduSpanReceiver extends SpanReceiver {
 
     masterHost = conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
             KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST);
-    masterPort = conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT);
+    masterPort = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT));
 
     if (conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY) != null) {
       bossCount = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY));
@@ -137,186 +110,86 @@ public class KuduSpanReceiver extends SpanReceiver {
             adminOperationTimeout,
             operationTimeout,
             socketReadTimeout);
-
-    this.queue = new ArrayBlockingQueue<Span>(conf.getInt(KuduReceiverConstants.SPAN_BLOCKING_QUEUE_SIZE_KEY,
-            KuduReceiverConstants.DEFAULT_SPAN_BLOCKING_QUEUE_SIZE));
-
+    //table names made configurable
     this.table_span = conf.get(KuduReceiverConstants.KUDU_SPAN_TABLE_KEY, KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE);
-    this.table_timeline= conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY,
+    this.table_timeline = conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY,
             KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE);
-
-    this.column_span_trace_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_TRACE_ID_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
-    this.column_span_start_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_START_TIME_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
-    this.column_span_stop_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_STOP_TIME_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
-    this.column_span_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_SPAN_ID_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID);
-    this.column_span_process_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PROCESS_ID_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID);
-    this.column_span_parent_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_ID_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID);
-    this.column_span_description = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_DESCRIPTION_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
-    this.column_span_parent = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT);
-    this.column_timeline_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIME_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
-    this.column_timeline_message = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_MESSAGE_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
-    this.column_timeline_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_SPANID_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID);
-    this.column_timeline_timeline_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIMELINEID_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID);
-
-    this.maxSpanBatchSize = conf.getInt(KuduReceiverConstants.MAX_SPAN_BATCH_SIZE_KEY,
-            KuduReceiverConstants.DEFAULT_MAX_SPAN_BATCH_SIZE);
-    if (this.service != null) {
-      this.service.shutdownNow();
-      this.service = null;
-    }
-    int numThreads = conf.getInt(KuduReceiverConstants.NUM_PARALLEL_THREADS_KEY,
-            KuduReceiverConstants.DEFAULT_NUM_PARALLEL_THREADS);
-    this.service = Executors.newFixedThreadPool(numThreads, threadFactory);
-    for (int i = 0; i < numThreads; i++) {
-      this.service.submit(new KuduSpanReceiver.WriteSpanRunnable());
+    //default column names have used
+    this.column_span_trace_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID;
+    this.column_span_start_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME;
+    this.column_span_stop_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME;
+    this.column_span_span_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID;
+    this.column_span_parent_id_low = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW;
+    this.column_span_parent_id_high = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH;
+    this.column_span_description = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION;
+    this.column_span_parent = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT;
+    this.column_timeline_time = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME;
+    this.column_timeline_message = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE;
+    this.column_timeline_span_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID;
+    this.column_timeline_timeline_id = KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID;
+    //kudu backend session initialization
+    if (this.session == null) {
+      if (this.client == null) {
+        client = clientConf.buildClient();
+      }
+      session = client.newSession();
     }
   }
 
   @Override
   public void close() throws IOException {
-    running.set(false);
-    service.shutdown();
     try {
-      if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
-        LOG.error("Timeout " + SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS +
-                " reached while shutting worker threads which process enqued spans." +
-                " Enqueued spans which are left in blocking queue is dropped.");
+      if (this.session != null) {
+        if (this.session.isClosed()) {
+          this.session.close();
+        }
+        this.client.close();
       }
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted exception occured while terminating thread service executor.",
e);
+    } catch (java.lang.Exception e) {
+      LOG.warn("Failed to close Kudu session. " + e.getMessage());
     }
   }
 
   @Override
   public void receiveSpan(Span span) {
-    if (running.get()) {
-      try {
-        this.queue.add(span);
-      } catch (IllegalStateException e) {
-        LOG.error("Error trying to enqueue span ("
-                + span.getDescription()
-                + ") to the queue. Blocking Queue is currently reached its capacity.");
+   try {
+      KuduTable tableSpan = client.openTable(table_span);
+      Insert spanInsert = tableSpan.newInsert();
+      PartialRow spanRow = spanInsert.getRow();
+      spanRow.addLong(column_span_trace_id, span.getSpanId().getLow());
+      spanRow.addLong(column_span_start_time, span.getStartTimeMillis());
+      spanRow.addLong(column_span_stop_time, span.getStopTimeMillis());
+      spanRow.addLong(column_span_span_id, span.getSpanId().getHigh());
+      if (span.getParents().length == 0) {
+        spanRow.addLong(column_span_parent_id_low, 0);
+        spanRow.addLong(column_span_parent_id_high, 0);
+        spanRow.addBoolean(column_span_parent, true);
+      } else if (span.getParents().length > 0) {
+        spanRow.addLong(column_span_parent_id_low, span.getParents()[0].getLow());
+        spanRow.addLong(column_span_parent_id_high, span.getParents()[0].getHigh());
+        spanRow.addBoolean(column_span_parent, false);
       }
-    }
-  }
-
-  private class WriteSpanRunnable implements Runnable {
-
-    private KuduSession session;
-    private KuduClient client;
-
-    @Override
-    public void run() {
-      List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize);
-      long errorCount = 0;
-      while (running.get() || queue.size() > 0) {
-        Span firstSpan = null;
-        try {
-          firstSpan = queue.poll(1, TimeUnit.SECONDS);
-          if (firstSpan != null) {
-            dequeuedSpans.add(firstSpan);
-            queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1);
-          }
-        } catch (InterruptedException ie) {
-          LOG.error("Interrupted Exception occurred while polling to " +
-                  "retrieve first span from blocking queue");
-        }
-        startSession();
-        if (dequeuedSpans.isEmpty()) {
-          try {
-            this.session.flush();
-          } catch (java.lang.Exception e) {
-            LOG.error("Failed to flush writes to Kudu.");
-            closeSession();
-          }
-          continue;
-        }
-        try {
-          for (Span span : dequeuedSpans) {
-            KuduTable tableSpan = client.openTable(table_span);
-            Insert spanInsert = tableSpan.newInsert();
-            PartialRow spanRow = spanInsert.getRow();
-            spanRow.addLong(column_span_trace_id,span.getSpanId().getHigh());
-            spanRow.addLong(column_span_start_time,span.getStartTimeMillis());
-            spanRow.addLong(column_span_stop_time,span.getStopTimeMillis());
-            spanRow.addLong(column_span_span_id,span.getSpanId().getLow());
-            spanRow.addString(column_span_process_id,span.getTracerId());
-            if (span.getParents().length == 0) {
-              spanRow.addLong(column_span_parent_id,0);
-              spanRow.addBoolean(column_span_parent,false);
-            } else if (span.getParents().length > 0) {
-              spanRow.addLong(column_span_parent_id,span.getParents()[0].getLow());
-              spanRow.addBoolean(column_span_parent,true);
-            }
-            spanRow.addString(column_span_description,span.getDescription());
-            this.session.apply(spanInsert);
-            long annotationCounter = 0;
-            for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
-              annotationCounter++;
-              KuduTable tableTimeline = client.openTable(table_timeline);
-              Insert timelineInsert = tableTimeline.newInsert();
-              PartialRow timelineRow = timelineInsert.getRow();
-              timelineRow.addLong(column_timeline_timeline_id,span.getSpanId().getHigh()+annotationCounter);
-              timelineRow.addLong(column_timeline_time,ta.getTime());
-              timelineRow.addString(column_timeline_message,ta.getMessage());
-              timelineRow.addLong(column_timeline_span_id,span.getSpanId().getHigh());
-              this.session.apply(timelineInsert);
-            }
-          }
-          dequeuedSpans.clear();
-          errorCount = 0;
-        } catch (Exception e) {
-          errorCount += 1;
-          if (errorCount < MAX_ERRORS) {
-            try {
-              queue.addAll(dequeuedSpans);
-            } catch (IllegalStateException ex) {
-              LOG.error("Exception occured while writing spans kudu datastore. " +
-                      "Trying to re-enqueue de-queued spans to blocking queue for writing
but failed. " +
-                      "Dropped " + dequeuedSpans.size() + " dequeued span(s) which were due
written" +
-                      "into kudu datastore");
-            }
-          }
-          closeSession();
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException e1) {
-            LOG.error("Interrupted Exception occurred while allowing kudu to re-stabilized");
-          }
-        }
+      spanRow.addString(column_span_description, span.getDescription());
+      session.apply(spanInsert);
+      long annotationCounter = 0;
+      for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
+        annotationCounter++;
+        KuduTable tableTimeline = client.openTable(table_timeline);
+        Insert timelineInsert = tableTimeline.newInsert();
+        PartialRow timelineRow = timelineInsert.getRow();
+        timelineRow.addLong(column_timeline_timeline_id, span.getSpanId().getLow() + annotationCounter);
+        timelineRow.addLong(column_timeline_time, ta.getTime());
+        timelineRow.addString(column_timeline_message, ta.getMessage());
+        timelineRow.addLong(column_timeline_span_id, span.getSpanId().getLow());
+        session.apply(timelineInsert);
       }
-      closeSession();
-    }
-
-    private void closeSession() {
+    } catch (java.lang.Exception ex) {
+      LOG.error("Failed to write span to Kudu backend", ex);
+    } finally {
       try {
-        if (this.session != null) {
-          this.session.close();
-          this.session = null;
-        }
-      } catch (java.lang.Exception e) {
-        LOG.warn("Failed to close Kudu session. " + e.getMessage());
-      }
-    }
-
-    private void startSession() {
-      if (this.session == null) {
-        if (this.client == null) {
-          client = clientConf.buildClient();
-        }
-        session = client.newSession();
+        session.flush();
+      } catch (java.lang.Exception ex) {
+        //Ignore
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
index 8b446e2..c13970d 100644
--- a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
+++ b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanReceiver.java
@@ -48,7 +48,7 @@ import java.util.List;
 public class TestKuduSpanReceiver extends BaseKuduTest {
 
   private static final String BIN_DIR_PROP = "binDir";
-  private static final String BIN_DIR_PROP_DEFAULT = "./build/release/bin";
+  private static final String BIN_DIR_PROP_DEFAULT = "../build/release/bin";
   //set kudu binary location and enable test execution from here
   private static final boolean TEST_ENABLE = false;
 
@@ -83,10 +83,10 @@ public class TestKuduSpanReceiver extends BaseKuduTest {
     span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID,
             Type.INT64)
             .build());
-    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID,
-            Type.STRING)
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH,
+            Type.INT64)
             .build());
-    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID,
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW,
             Type.INT64)
             .build());
     span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT,
@@ -137,6 +137,7 @@ public class TestKuduSpanReceiver extends BaseKuduTest {
                     KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]))
             .build();
     TraceScope scope = tracer.newScope("testKuduScope");
+    scope.addTimelineAnnotation("test");
     Span testSpan = scope.getSpan();
     scope.close();
     tracer.close();
@@ -147,6 +148,8 @@ public class TestKuduSpanReceiver extends BaseKuduTest {
     spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
     spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
     spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
+    spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH);
+    spanColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW);
     KuduScanner scanner = client.newScannerBuilder(client.openTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE))
             .setProjectedColumnNames(spanColumns)
             .build();
@@ -157,11 +160,18 @@ public class TestKuduSpanReceiver extends BaseKuduTest {
         RowResult result = results.next();
         long traceId = result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
         MilliSpan.Builder builder = new MilliSpan.Builder()
-                .spanId(new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID),
-                        result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID)))
+                .spanId(new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID),
+                        result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID)))
                 .description(result.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION))
                 .begin(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME))
                 .end(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME));
+        if (!(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH)
== 0 &&
+                result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW)
== 0)) {
+          SpanId[] parents = new SpanId[1];
+          parents[0] = new SpanId(result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH),
+                  result.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW));
+          builder.parents(parents);
+        }
         List<String> timelineColumns = new ArrayList<>();
         timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
         timelineColumns.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
@@ -177,7 +187,7 @@ public class TestKuduSpanReceiver extends BaseKuduTest {
         while (timelineScanner.hasMoreRows()) {
           RowResultIterator timelineResults = timelineScanner.nextRows();
           while (timelineResults.hasNext()) {
-            RowResult timelineRow = results.next();
+            RowResult timelineRow = timelineResults.next();
             timelineList.add(new TimelineAnnotation
                     (timelineRow.getLong(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME),
                             timelineRow.getString(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE)));
@@ -193,6 +203,10 @@ public class TestKuduSpanReceiver extends BaseKuduTest {
     Assert.assertEquals(testSpan.getStartTimeMillis(), dbSpan.getStartTimeMillis());
     Assert.assertEquals(testSpan.getStopTimeMillis(), dbSpan.getStopTimeMillis());
     Assert.assertEquals(testSpan.getDescription(), dbSpan.getDescription());
+    Assert.assertEquals(testSpan.getTimelineAnnotations().get(0).getMessage(),
+            dbSpan.getTimelineAnnotations().get(0).getMessage());
+    Assert.assertEquals(testSpan.getTimelineAnnotations().get(0).getTime(),
+            dbSpan.getTimelineAnnotations().get(0).getTime());
     syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE);
     syncClient.deleteTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE);
   }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/0cd394f5/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java
----------------------------------------------------------------------
diff --git a/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java
new file mode 100644
index 0000000..7dd2807
--- /dev/null
+++ b/htrace-kudu/src/test/java/org/apache/htrace/impl/TestKuduSpanViewer.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.Tracer;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.TracerPool;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.apache.htrace.viewer.KuduSpanViewer;
+import org.junit.*;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.KuduClient;
+import org.kududb.client.CreateTableOptions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class TestKuduSpanViewer extends BaseKuduTest {
+
+  private static final String BIN_DIR_PROP = "binDir";
+  private static final String BIN_DIR_PROP_DEFAULT = "../build/release/bin";
+  //set kudu binary location and enable test execution from here
+  private static final boolean TEST_ENABLE = false;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    if (TEST_ENABLE) {
+      System.setProperty(BIN_DIR_PROP, BIN_DIR_PROP_DEFAULT);
+      BaseKuduTest.setUpBeforeClass();
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if(TEST_ENABLE) {
+      BaseKuduTest.tearDownAfterClass();
+    }
+  }
+
+  private void createTable() throws Exception {
+    KuduClient client = BaseKuduTest.syncClient;
+    List<ColumnSchema> span_columns = new ArrayList();
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID,
+            Type.INT64)
+            .key(true)
+            .build());
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME,
+            Type.INT64)
+            .build());
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME,
+            Type.INT64)
+            .build());
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID,
+            Type.INT64)
+            .build());
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_LOW,
+            Type.INT64)
+            .build());
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID_HIGH,
+            Type.INT64)
+            .build());
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT,
+            Type.BOOL)
+            .build());
+    span_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION,
+            Type.STRING)
+            .build());
+
+    List<String> rangeKeys = new ArrayList<>();
+    rangeKeys.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+    Schema schema = new Schema(span_columns);
+    client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE, schema,
+            new CreateTableOptions().setRangePartitionColumns(rangeKeys));
+
+    List<ColumnSchema> timeline_columns = new ArrayList();
+    timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
+            (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID, Type.INT64)
+            .key(true)
+            .build());
+    timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME,
+            Type.INT64)
+            .build());
+    timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder
+            (KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE, Type.STRING)
+            .build());
+    timeline_columns.add(new ColumnSchema.ColumnSchemaBuilder(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID,
+            Type.INT64)
+            .build());
+    List<String> rangeKeysTimeline = new ArrayList<>();
+    rangeKeysTimeline.add(KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID);
+    Schema timelineSchema = new Schema(timeline_columns);
+    client.createTable(KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE,
timelineSchema,
+            new CreateTableOptions().setRangePartitionColumns(rangeKeysTimeline));
+  }
+
+
+  @Test
+  public void testSpanToJson() {
+    SpanId[] parent = new SpanId[1];
+    parent[0] = new SpanId(1,1);
+    MilliSpan.Builder builder = new MilliSpan.Builder()
+            .parents(parent)
+            .begin(1)
+            .end(2)
+            .spanId(new SpanId(10,20))
+            .description("description");
+    List<TimelineAnnotation> timelineList = new LinkedList<TimelineAnnotation>();
+    for (int i = 0; i < 3; i++) {
+      timelineList.add(new TimelineAnnotation(i,"message" + i));
+    }
+    builder.timeline(timelineList);
+    Span span = builder.build();
+    try {
+      String json = KuduSpanViewer.toJsonString(span);
+      String expected =
+              "{\"trace_id\":\"20\",\"span_id\":\"10\",\"description\":\"description\",\"parent_id\":\"1\","
+
+                      "\"start\":\"1\",\"stop\":\"2\",\"timeline\":[{\"time\":\"0\",\"message\":\"message0\",}{\"time\":\"1\","
+
+                      "\"message\":\"message1\",}{\"time\":\"2\",\"message\":\"message2\",}]}";
+      Assert.assertEquals(json, expected);
+    } catch (IOException e) {
+      Assert.fail("failed to get json from span. " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSpanWithoutTimelineToJson() {
+    SpanId[] parent = new SpanId[1];
+    parent[0] = new SpanId(200,111);
+    MilliSpan.Builder builder = new MilliSpan.Builder()
+            .parents(parent)
+            .begin(1)
+            .end(2)
+            .spanId(new SpanId(10,20))
+            .tracerId("pid")
+            .description("description");
+    Span span = builder.build();
+    try {
+      String json = KuduSpanViewer.toJsonString(span);
+      String expected =
+              "{\"trace_id\":\"20\",\"span_id\":\"10\",\"description\":\"description\","
+
+                      "\"parent_id\":\"111\",\"start\":\"1\",\"stop\":\"2\",}";
+      Assert.assertEquals(json, expected);
+    } catch (IOException e) {
+      Assert.fail("failed to get json from span. " + e.getMessage());
+    }
+  }
+
+  @Ignore
+  @Test
+  public void TestKuduSpanViewer() throws Exception {
+    createTable();
+    Tracer tracer = new Tracer.Builder().
+            name("testKuduSpanReceiver").
+            tracerPool(new TracerPool("testKuduSpanReceiver")).
+            conf(HTraceConfiguration.fromKeyValuePairs(
+                    "sampler.classes", "AlwaysSampler",
+                    "span.receiver.classes", "org.apache.htrace.impl.KuduSpanReceiver",
+                    KuduReceiverConstants.KUDU_MASTER_HOST_KEY, BaseKuduTest.getMasterAddresses().split(":")[0],
+                    KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]))
+            .build();
+    TraceScope scope = tracer.newScope("testKuduScope");
+    scope.addTimelineAnnotation("test");
+    Span testSpan = scope.getSpan();
+    TraceScope childScope = tracer.newScope("testKuduChildScope", new SpanId(100,200));
+    Span childScopeSpan = childScope.getSpan();
+    childScope.addTimelineAnnotation("testChild");
+    childScope.close();
+    scope.close();
+    tracer.close();
+    HTraceConfiguration conf = HTraceConfiguration
+            .fromKeyValuePairs(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
+                    BaseKuduTest.getMasterAddresses().split(":")[0],
+            KuduReceiverConstants.KUDU_MASTER_PORT_KEY, BaseKuduTest.getMasterAddresses().split(":")[1]);
+    KuduSpanViewer viewer = new KuduSpanViewer(conf);
+    List<Span> list = viewer.getRootSpans();
+    Assert.assertEquals(list.size(), 1);
+    Span span = viewer.getRootSpans().get(0);
+    try {
+      String json = KuduSpanViewer.toJsonString(span);
+      String expected = KuduSpanViewer.toJsonString(testSpan);
+      Assert.assertEquals(json, expected);
+    } catch (IOException e) {
+      Assert.fail("failed to get json from span. " + e.getMessage());
+    }
+    List<Span> list2 = viewer.getSpans(span.getSpanId().getHigh());
+    Assert.assertEquals(list2.size(), 2);
+    Span span2 = list2.get(0);
+    try {
+      String json = KuduSpanViewer.toJsonString(span2);
+      String expected = null;
+      if(span2.getParents().length != 0) {
+        expected = KuduSpanViewer.toJsonString(childScopeSpan);
+      } else {
+        expected = KuduSpanViewer.toJsonString(testSpan);
+      }
+      Assert.assertEquals(json, expected);
+    } catch (IOException e) {
+      Assert.fail("failed to get json from span. " + e.getMessage());
+    }
+    Span span3 = list2.get(1);
+    try {
+      String json = KuduSpanViewer.toJsonString(span3);
+      String expected = null;
+      if(span3.getParents().length != 0) {
+        expected = KuduSpanViewer.toJsonString(childScopeSpan);
+      } else {
+        expected = KuduSpanViewer.toJsonString(testSpan);
+      }
+      Assert.assertEquals(json, expected);
+    } catch (IOException e) {
+      Assert.fail("failed to get json from span. " + e.getMessage());
+    }
+  }
+}



Mime
View raw message