gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-263] Fix TaskExecutor metric calculation
Date Mon, 25 Sep 2017 18:11:01 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d6c7fe79b -> aea0e4227


[GOBBLIN-263] Fix TaskExecutor metric calculation

Closes #2115 from kadaan/Fix_for_GOBBLIN-263


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/aea0e422
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/aea0e422
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/aea0e422

Branch: refs/heads/master
Commit: aea0e422792b5ca35fdd13d1e4030fcb0925acf1
Parents: d6c7fe7
Author: Joel Baranick <joel.baranick@ensighten.com>
Authored: Mon Sep 25 11:10:56 2017 -0700
Committer: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Committed: Mon Sep 25 11:10:56 2017 -0700

----------------------------------------------------------------------
 .../apache/gobblin/runtime/TaskExecutor.java    | 57 ++++++++++++++------
 1 file changed, 40 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/aea0e422/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index 28ea378..476282c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -92,8 +92,8 @@ public class TaskExecutor extends AbstractIdleService {
   // the task waited to start.
   private final ConcurrentSkipListMap<Long, Long> queuedTaskTimeHistorical = new ConcurrentSkipListMap<>();
 
-  // The timestamp for the last time the metric source data was pruned.
-  private long lastCleanupTime = 0;
+  // The timestamp for the last time the metrics were calculated.
+  private long lastCalculationTime = 0;
 
   // The total number of tasks currently queued and queued over the historical lookback period.
   private AtomicInteger queuedTaskCount = new AtomicInteger();
@@ -282,19 +282,30 @@ public class TaskExecutor extends AbstractIdleService {
     return this.metricSet;
   }
 
-  private synchronized void cleanupMetricSources() {
+  private synchronized void calculateMetrics() {
     long currentTimeMillis = System.currentTimeMillis();
-    if (lastCleanupTime < currentTimeMillis - TimeUnit.SECONDS.toMillis(10)) {
+    if (lastCalculationTime < currentTimeMillis - TimeUnit.SECONDS.toMillis(10)) {
+      LOG.debug("Starting metric calculation.");
       int currentQueuedTaskCount = 0;
+      int futureQueuedTaskCount = 0;
       long currentQueuedTaskTotalTime = 0;
       for (Map.Entry<String, Long> queuedTask : this.queuedTasks.entrySet()) {
         if (queuedTask.getValue() <= currentTimeMillis) {
           currentQueuedTaskCount++;
-          currentQueuedTaskTotalTime += queuedTask.getValue();
+          long currentQueuedTaskTime = currentTimeMillis - queuedTask.getValue();
+          currentQueuedTaskTotalTime += currentQueuedTaskTime;
+          LOG.debug(String.format("Task %s has been waiting in the queue for %d ms.", queuedTask.getKey(),
currentQueuedTaskTime));
+        } else {
+          futureQueuedTaskCount++;
         }
       }
+      if (futureQueuedTaskCount > 0) {
+        LOG.debug(String.format("%d tasks were ignored during metric calculations because
they are scheduled to run in the future.", futureQueuedTaskCount));
+      }
+
       this.currentQueuedTaskCount.set(currentQueuedTaskCount);
       this.currentQueuedTaskTotalTime.set(currentQueuedTaskTotalTime);
+      LOG.debug(String.format("%d current tasks have been waiting for a total of %d ms.",
currentQueuedTaskCount, currentQueuedTaskTotalTime));
 
       int historicalQueuedTaskCount = 0;
       long historicalQueuedTaskTotalTime = 0;
@@ -304,10 +315,12 @@ public class TaskExecutor extends AbstractIdleService {
         try {
           Map.Entry<Long, Long> historicalQueuedTask = iterator.next();
           if (historicalQueuedTask.getKey() < cutoff || historicalQueuedTaskCount >=
queuedTaskTimeMaxSize) {
+            LOG.debug(String.format("Task started at %d is before the cutoff of %d and is
being removed. Queue time %d will be removed from metric calculations.", historicalQueuedTask.getKey(),
cutoff, historicalQueuedTask.getValue()));
             iterator.remove();
           } else {
             historicalQueuedTaskCount++;
             historicalQueuedTaskTotalTime += historicalQueuedTask.getValue();
+            LOG.debug(String.format("Task started at %d is after cutoff. Queue time %d will
be used in metric calculations.", historicalQueuedTask.getKey(), historicalQueuedTask.getValue()));
           }
         } catch (NoSuchElementException e) {
           LOG.warn("Ran out of items in historical task queue time set.");
@@ -315,13 +328,18 @@ public class TaskExecutor extends AbstractIdleService {
       }
       this.historicalQueuedTaskCount.set(historicalQueuedTaskCount);
       this.historicalQueuedTaskTotalTime.set(historicalQueuedTaskTotalTime);
+      LOG.debug(String.format("%d historical tasks have been waiting for a total of %d ms.",
historicalQueuedTaskCount, historicalQueuedTaskTotalTime));
 
-      this.queuedTaskCount.set(currentQueuedTaskCount + historicalQueuedTaskCount);
-      this.queuedTaskTotalTime.set(currentQueuedTaskTotalTime + historicalQueuedTaskTotalTime);
+      int totalQueuedTaskCount = currentQueuedTaskCount + historicalQueuedTaskCount;
+      long totalQueuedTaskTime = currentQueuedTaskTotalTime + historicalQueuedTaskTotalTime;
+      this.queuedTaskCount.set(totalQueuedTaskCount);
+      this.queuedTaskTotalTime.set(totalQueuedTaskTime);
+      LOG.debug(String.format("%d tasks have been waiting for a total of %d ms.", totalQueuedTaskCount,
totalQueuedTaskTime));
 
-      this.lastCleanupTime = currentTimeMillis;
+      this.lastCalculationTime = currentTimeMillis;
+      LOG.debug("Finished metric calculation.");
     } else {
-      LOG.debug("Skipped cleanup of metrics sources because not enough time has passed since
last cleanup.");
+      LOG.debug("Skipped metric calculation because not enough time has elapsed since the
last calculation.");
     }
   }
 
@@ -332,42 +350,42 @@ public class TaskExecutor extends AbstractIdleService {
       metrics.put(name("queued", "current", "count"), new Gauge<Integer>() {
         @Override
         public Integer getValue() {
-          cleanupMetricSources();
+          calculateMetrics();
           return currentQueuedTaskCount.intValue();
         }
       });
       metrics.put(name("queued", "historical", "count"), new Gauge<Integer>() {
         @Override
         public Integer getValue() {
-          cleanupMetricSources();
+          calculateMetrics();
           return historicalQueuedTaskCount.intValue();
         }
       });
       metrics.put(name("queued", "count"), new Gauge<Integer>() {
         @Override
         public Integer getValue() {
-          cleanupMetricSources();
+          calculateMetrics();
           return queuedTaskCount.intValue();
         }
       });
       metrics.put(name("queued", "current", "time", "total"), new Gauge<Long>() {
         @Override
         public Long getValue() {
-          cleanupMetricSources();
+          calculateMetrics();
           return currentQueuedTaskTotalTime.longValue();
         }
       });
       metrics.put(name("queued", "historical", "time", "total"), new Gauge<Long>()
{
         @Override
         public Long getValue() {
-          cleanupMetricSources();
+          calculateMetrics();
           return historicalQueuedTaskTotalTime.longValue();
         }
       });
       metrics.put(name("queued", "time", "total"), new Gauge<Long>() {
         @Override
         public Long getValue() {
-          cleanupMetricSources();
+          calculateMetrics();
           return queuedTaskTotalTime.longValue();
         }
       });
@@ -386,7 +404,10 @@ public class TaskExecutor extends AbstractIdleService {
     }
 
     public TrackingTask(Task task, long interval, TimeUnit timeUnit) {
-      queuedTasks.putIfAbsent(task.getTaskId(), System.currentTimeMillis() + timeUnit.toMillis(interval));
+      long now = System.currentTimeMillis();
+      long timeToRun = now + timeUnit.toMillis(interval);
+      LOG.debug(String.format("Task %s queued to run %s.", task.getTaskId(), timeToRun <=
now ? "now" : "at " + timeToRun));
+      queuedTasks.putIfAbsent(task.getTaskId(), timeToRun);
       this.underlyingTask = task;
     }
 
@@ -407,7 +428,9 @@ public class TaskExecutor extends AbstractIdleService {
 
     private void onStart(long startTime) {
       Long queueTime = queuedTasks.remove(this.underlyingTask.getTaskId());
-      queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), startTime - queueTime);
+      long timeInQueue = startTime - queueTime;
+      LOG.debug(String.format("Task %s started. Saving queued time of %d ms to history.",
underlyingTask.getTaskId(), timeInQueue));
+      queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), timeInQueue);
       runningTaskCount.inc();
     }
   }


Mime
View raw message