spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-18256] Improve the performance of event log replay in HistoryServer
Date Sat, 05 Nov 2016 02:32:30 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4cee2ce25 -> 0e3312ee7


[SPARK-18256] Improve the performance of event log replay in HistoryServer

## What changes were proposed in this pull request?

This patch significantly improves the performance of event log replay in the HistoryServer
via two simple changes:

- **Don't use `extractOpt`**: it turns out that `json4s`'s `extractOpt` method uses exceptions
for control flow, causing huge performance bottlenecks due to the overhead of initializing
exceptions. To avoid this overhead, we can simply use our own` Utils.jsonOption` method. This
patch replaces all uses of `extractOpt` with `Utils.jsonOption` and adds a style checker rule
to ban the use of the slow `extractOpt` method.
- **Don't call `Utils.getFormattedClassName` for every event**: the old code called` Utils.getFormattedClassName`
dozens of times per replayed event in order to match up class names in events with SparkListener
event names. By simply storing the results of these calls in constants rather than recomputing
them, we're able to eliminate a huge performance hotspot by removing thousands of expensive
`Class.getSimpleName` calls.

## How was this patch tested?

Tested by profiling the replay of a long event log using YourKit. For an event log containing
1000+ jobs, each of which had thousands of tasks, the changes in this patch cut the replay
time in half:

![image](https://cloud.githubusercontent.com/assets/50748/19980953/31154622-a1bd-11e6-9be4-21fbb9b3f9a7.png)

Prior to this patch's changes, the two slowest methods in log replay were internal exceptions
thrown by `Json4S` and calls to `Class.getSimpleName()`:

![image](https://cloud.githubusercontent.com/assets/50748/19981052/87416cce-a1bd-11e6-9f25-06a7cd391822.png)

After this patch, these hotspots are completely eliminated.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15756 from JoshRosen/speed-up-jsonprotocol.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e3312ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e3312ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e3312ee

Branch: refs/heads/master
Commit: 0e3312ee72c44f4c9acafbd80d0c8a14f3aff875
Parents: 4cee2ce
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Fri Nov 4 19:32:26 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Fri Nov 4 19:32:26 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/JsonProtocol.scala    | 106 +++++++++++--------
 scalastyle-config.xml                           |   6 ++
 2 files changed, 70 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e3312ee/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c11eb3f..6593aab 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -107,20 +107,20 @@ private[spark] object JsonProtocol {
   def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
     val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
     val properties = propertiesToJson(stageSubmitted.properties)
-    ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~
     ("Stage Info" -> stageInfo) ~
     ("Properties" -> properties)
   }
 
   def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
     val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
-    ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~
     ("Stage Info" -> stageInfo)
   }
 
   def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
     val taskInfo = taskStart.taskInfo
-    ("Event" -> Utils.getFormattedClassName(taskStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~
     ("Stage ID" -> taskStart.stageId) ~
     ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
     ("Task Info" -> taskInfoToJson(taskInfo))
@@ -128,7 +128,7 @@ private[spark] object JsonProtocol {
 
   def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue
= {
     val taskInfo = taskGettingResult.taskInfo
-    ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~
     ("Task Info" -> taskInfoToJson(taskInfo))
   }
 
@@ -137,7 +137,7 @@ private[spark] object JsonProtocol {
     val taskInfo = taskEnd.taskInfo
     val taskMetrics = taskEnd.taskMetrics
     val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
-    ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~
     ("Stage ID" -> taskEnd.stageId) ~
     ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
     ("Task Type" -> taskEnd.taskType) ~
@@ -148,7 +148,7 @@ private[spark] object JsonProtocol {
 
   def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
     val properties = propertiesToJson(jobStart.properties)
-    ("Event" -> Utils.getFormattedClassName(jobStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~
     ("Job ID" -> jobStart.jobId) ~
     ("Submission Time" -> jobStart.time) ~
     ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~  // Added in Spark 1.2.0
@@ -158,7 +158,7 @@ private[spark] object JsonProtocol {
 
   def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
     val jobResult = jobResultToJson(jobEnd.jobResult)
-    ("Event" -> Utils.getFormattedClassName(jobEnd)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd) ~
     ("Job ID" -> jobEnd.jobId) ~
     ("Completion Time" -> jobEnd.time) ~
     ("Job Result" -> jobResult)
@@ -170,7 +170,7 @@ private[spark] object JsonProtocol {
     val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
     val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
     val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
-    ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate) ~
     ("JVM Information" -> jvmInformation) ~
     ("Spark Properties" -> sparkProperties) ~
     ("System Properties" -> systemProperties) ~
@@ -179,7 +179,7 @@ private[spark] object JsonProtocol {
 
   def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue
= {
     val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
-    ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
     ("Block Manager ID" -> blockManagerId) ~
     ("Maximum Memory" -> blockManagerAdded.maxMem) ~
     ("Timestamp" -> blockManagerAdded.time)
@@ -187,18 +187,18 @@ private[spark] object JsonProtocol {
 
   def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue
= {
     val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
-    ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerRemoved) ~
     ("Block Manager ID" -> blockManagerId) ~
     ("Timestamp" -> blockManagerRemoved.time)
   }
 
   def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
-    ("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.unpersistRDD) ~
     ("RDD ID" -> unpersistRDD.rddId)
   }
 
   def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
-    ("Event" -> Utils.getFormattedClassName(applicationStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart) ~
     ("App Name" -> applicationStart.appName) ~
     ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
     ("Timestamp" -> applicationStart.time) ~
@@ -208,33 +208,33 @@ private[spark] object JsonProtocol {
   }
 
   def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
-    ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd) ~
     ("Timestamp" -> applicationEnd.time)
   }
 
   def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
-    ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~
     ("Timestamp" -> executorAdded.time) ~
     ("Executor ID" -> executorAdded.executorId) ~
     ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
   }
 
   def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
-    ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorRemoved) ~
     ("Timestamp" -> executorRemoved.time) ~
     ("Executor ID" -> executorRemoved.executorId) ~
     ("Removed Reason" -> executorRemoved.reason)
   }
 
   def logStartToJson(logStart: SparkListenerLogStart): JValue = {
-    ("Event" -> Utils.getFormattedClassName(logStart)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.logStart) ~
     ("Spark Version" -> SPARK_VERSION)
   }
 
   def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue
= {
     val execId = metricsUpdate.execId
     val accumUpdates = metricsUpdate.accumUpdates
-    ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
     ("Executor ID" -> execId) ~
     ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates)
=>
       ("Task ID" -> taskId) ~
@@ -485,7 +485,7 @@ private[spark] object JsonProtocol {
    * JSON deserialization methods for SparkListenerEvents |
    * ---------------------------------------------------- */
 
-  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+  private object SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES {
     val stageSubmitted = Utils.getFormattedClassName(SparkListenerStageSubmitted)
     val stageCompleted = Utils.getFormattedClassName(SparkListenerStageCompleted)
     val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart)
@@ -503,6 +503,10 @@ private[spark] object JsonProtocol {
     val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
     val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
     val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
+  }
+
+  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+    import SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES._
 
     (json \ "Event").extract[String] match {
       case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -540,7 +544,8 @@ private[spark] object JsonProtocol {
 
   def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+    val stageAttemptId =
+      Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
     val taskInfo = taskInfoFromJson(json \ "Task Info")
     SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
   }
@@ -552,7 +557,8 @@ private[spark] object JsonProtocol {
 
   def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+    val stageAttemptId =
+      Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
     val taskType = (json \ "Task Type").extract[String]
     val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
     val taskInfo = taskInfoFromJson(json \ "Task Info")
@@ -662,20 +668,22 @@ private[spark] object JsonProtocol {
 
   def stageInfoFromJson(json: JValue): StageInfo = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
+    val attemptId = Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
     val stageName = (json \ "Stage Name").extract[String]
     val numTasks = (json \ "Number of Tasks").extract[Int]
     val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
     val parentIds = Utils.jsonOption(json \ "Parent IDs")
       .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
       .getOrElse(Seq.empty)
-    val details = (json \ "Details").extractOpt[String].getOrElse("")
+    val details = Utils.jsonOption(json \ "Details").map(_.extract[String]).getOrElse("")
     val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
     val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
     val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
-    val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match {
-      case Some(values) => values.map(accumulableInfoFromJson)
-      case None => Seq[AccumulableInfo]()
+    val accumulatedValues = {
+      Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
+        case Some(values) => values.map(accumulableInfoFromJson)
+        case None => Seq[AccumulableInfo]()
+      }
     }
 
     val stageInfo = new StageInfo(
@@ -692,17 +700,17 @@ private[spark] object JsonProtocol {
   def taskInfoFromJson(json: JValue): TaskInfo = {
     val taskId = (json \ "Task ID").extract[Long]
     val index = (json \ "Index").extract[Int]
-    val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
+    val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
     val launchTime = (json \ "Launch Time").extract[Long]
     val executorId = (json \ "Executor ID").extract[String]
     val host = (json \ "Host").extract[String]
     val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
-    val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
+    val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean])
     val gettingResultTime = (json \ "Getting Result Time").extract[Long]
     val finishTime = (json \ "Finish Time").extract[Long]
     val failed = (json \ "Failed").extract[Boolean]
-    val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false)
-    val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
+    val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
+    val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]])
match {
       case Some(values) => values.map(accumulableInfoFromJson)
       case None => Seq[AccumulableInfo]()
     }
@@ -719,12 +727,13 @@ private[spark] object JsonProtocol {
 
   def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
     val id = (json \ "ID").extract[Long]
-    val name = (json \ "Name").extractOpt[String]
+    val name = Utils.jsonOption(json \ "Name").map(_.extract[String])
     val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name,
v) }
     val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v)
}
-    val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
-    val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false)
-    val metadata = (json \ "Metadata").extractOpt[String]
+    val internal = Utils.jsonOption(json \ "Internal").exists(_.extract[Boolean])
+    val countFailedValues =
+      Utils.jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
+    val metadata = Utils.jsonOption(json \ "Metadata").map(_.extract[String])
     new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
   }
 
@@ -782,9 +791,11 @@ private[spark] object JsonProtocol {
       readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
       readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
       readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
-      readMetrics.incLocalBytesRead((readJson \ "Local Bytes Read").extractOpt[Long].getOrElse(0L))
+      readMetrics.incLocalBytesRead(
+        Utils.jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
       readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
-      readMetrics.incRecordsRead((readJson \ "Total Records Read").extractOpt[Long].getOrElse(0L))
+      readMetrics.incRecordsRead(
+        Utils.jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L))
       metrics.mergeShuffleReadMetrics()
     }
 
@@ -793,8 +804,8 @@ private[spark] object JsonProtocol {
     Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
       val writeMetrics = metrics.shuffleWriteMetrics
       writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
-      writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
-        .extractOpt[Long].getOrElse(0L))
+      writeMetrics.incRecordsWritten(
+        Utils.jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L))
       writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
     }
 
@@ -802,14 +813,16 @@ private[spark] object JsonProtocol {
     Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
       val outputMetrics = metrics.outputMetrics
       outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
-      outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L))
+      outputMetrics.setRecordsWritten(
+        Utils.jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L))
     }
 
     // Input metrics
     Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
       val inputMetrics = metrics.inputMetrics
       inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
-      inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
+      inputMetrics.incRecordsRead(
+        Utils.jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L))
     }
 
     // Updated blocks
@@ -824,7 +837,7 @@ private[spark] object JsonProtocol {
     metrics
   }
 
-  def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+  private object TASK_END_REASON_FORMATTED_CLASS_NAMES {
     val success = Utils.getFormattedClassName(Success)
     val resubmitted = Utils.getFormattedClassName(Resubmitted)
     val fetchFailed = Utils.getFormattedClassName(FetchFailed)
@@ -834,6 +847,10 @@ private[spark] object JsonProtocol {
     val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
     val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
     val unknownReason = Utils.getFormattedClassName(UnknownReason)
+  }
+
+  def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+    import TASK_END_REASON_FORMATTED_CLASS_NAMES._
 
     (json \ "Reason").extract[String] match {
       case `success` => Success
@@ -850,7 +867,8 @@ private[spark] object JsonProtocol {
         val className = (json \ "Class Name").extract[String]
         val description = (json \ "Description").extract[String]
         val stackTrace = stackTraceFromJson(json \ "Stack Trace")
-        val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull
+        val fullStackTrace =
+          Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull
         // Fallback on getting accumulator updates from TaskMetrics, which was logged in
Spark 1.x
         val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
           .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
@@ -891,9 +909,13 @@ private[spark] object JsonProtocol {
     BlockManagerId(executorId, host, port)
   }
 
-  def jobResultFromJson(json: JValue): JobResult = {
+  private object JOB_RESULT_FORMATTED_CLASS_NAMES {
     val jobSucceeded = Utils.getFormattedClassName(JobSucceeded)
     val jobFailed = Utils.getFormattedClassName(JobFailed)
+  }
+
+  def jobResultFromJson(json: JValue): JobResult = {
+    import JOB_RESULT_FORMATTED_CLASS_NAMES._
 
     (json \ "Result").extract[String] match {
       case `jobSucceeded` => JobSucceeded

http://git-wip-us.apache.org/repos/asf/spark/blob/0e3312ee/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 81d57d7..4833385 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -217,6 +217,12 @@ This file is divided into 3 sections:
     of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
   </check>
 
+  <check customId="extractopt" level="error" class="org.scalastyle.scalariform.TokenChecker"
enabled="true">
+    <parameters><parameter name="regex">extractOpt</parameter></parameters>
+    <customMessage>Use Utils.jsonOption(x).map(.extract[T]) instead of .extractOpt[T],
as the latter
+    is slower.  </customMessage>
+  </check>
+
   <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
     <parameters>
       <parameter name="groups">java,scala,3rdParty,spark</parameter>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message