spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-17038][STREAMING] fix metrics retrieval source of 'lastReceivedBatch'
Date Wed, 17 Aug 2016 23:31:45 GMT
Repository: spark
Updated Branches:
  refs/heads/master d60af8f6a -> e6bef7d52


[SPARK-17038][STREAMING] fix metrics retrieval source of 'lastReceivedBatch'

https://issues.apache.org/jira/browse/SPARK-17038

## What changes were proposed in this pull request?

StreamingSource's lastReceivedBatch_submissionTime, lastReceivedBatch_processingTimeStart,
and lastReceivedBatch_processingTimeEnd all use data from lastCompletedBatch instead of lastReceivedBatch.

In particular, this makes it impossible to match lastReceivedBatch_records with a batchID/submission
time.

This is apparent when looking at StreamingSource.scala, lines 89-94.

## How was this patch tested?

Manually running unit tests on local laptop

Author: Xin Ren <iamshrek@126.com>

Closes #14681 from keypointt/SPARK-17038.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6bef7d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6bef7d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6bef7d5

Branch: refs/heads/master
Commit: e6bef7d52f0e19ec771fb0f3e96c7ddbd1a6a19b
Parents: d60af8f
Author: Xin Ren <iamshrek@126.com>
Authored: Wed Aug 17 16:31:42 2016 -0700
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Wed Aug 17 16:31:42 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/StreamingSource.scala     | 6 +++---
 .../spark/streaming/ui/StreamingJobProgressListenerSuite.scala | 3 +++
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e6bef7d5/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index 9697437..0b306a2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -87,11 +87,11 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends
Source {
   // Gauge for last received batch, useful for monitoring the streaming job's running status,
   // displayed data -1 for any abnormal condition.
   registerGaugeWithOption("lastReceivedBatch_submissionTime",
-    _.lastCompletedBatch.map(_.submissionTime), -1L)
+    _.lastReceivedBatch.map(_.submissionTime), -1L)
   registerGaugeWithOption("lastReceivedBatch_processingStartTime",
-    _.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
+    _.lastReceivedBatch.flatMap(_.processingStartTime), -1L)
   registerGaugeWithOption("lastReceivedBatch_processingEndTime",
-    _.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
+    _.lastReceivedBatch.flatMap(_.processingEndTime), -1L)
 
   // Gauge for last received batch records.
   registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L)

http://git-wip-us.apache.org/repos/asf/spark/blob/e6bef7d5/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 26b757c..46ab3ac 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -68,6 +68,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers
{
     listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
     listener.runningBatches should be (Nil)
     listener.retainedCompletedBatches should be (Nil)
+    listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoSubmitted)))
     listener.lastCompletedBatch should be (None)
     listener.numUnprocessedBatches should be (1)
     listener.numTotalCompletedBatches should be (0)
@@ -81,6 +82,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers
{
     listener.waitingBatches should be (Nil)
     listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
     listener.retainedCompletedBatches should be (Nil)
+    listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoStarted)))
     listener.lastCompletedBatch should be (None)
     listener.numUnprocessedBatches should be (1)
     listener.numTotalCompletedBatches should be (0)
@@ -123,6 +125,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers
{
     listener.waitingBatches should be (Nil)
     listener.runningBatches should be (Nil)
     listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted)))
+    listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoCompleted)))
     listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted)))
     listener.numUnprocessedBatches should be (0)
     listener.numTotalCompletedBatches should be (1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message