flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm <...@git.apache.org>
Subject [GitHub] flink pull request: Framesize fix
Date Tue, 28 Jul 2015 10:59:27 GMT
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/934#discussion_r35635395
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
    @@ -338,16 +332,51 @@ class JobManager(
                 // is the client waiting for the job result?
                 newJobStatus match {
                   case JobStatus.FINISHED =>
    -                val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]]
= try {
    -                  executionGraph.getAccumulatorsSerialized
    +
    +                val jobConfig = currentJobs.getOrElse(jobID,
    +                  throw new RuntimeException("Unknown Job: " + jobID))._1.getJobConfiguration
    +
    +                val smallAccumulatorResults: java.util.Map[String, SerializedValue[AnyRef]]
= try {
    +                  executionGraph.getSmallAccumulatorsContentSerialized
                     } catch {
                       case e: Exception =>
                         log.error(s"Cannot fetch serialized accumulators for job $jobID",
e)
                         Collections.emptyMap()
                     }
    -                val result = new SerializedJobExecutionResult(jobID, jobInfo.duration,
    -                                                              accumulatorResults)
    -                jobInfo.client ! decorateMessage(JobResultSuccess(result))
    +
    +                var largeAccumulatorResults: java.util.Map[String, java.util.List[BlobKey]]
=
    +                  executionGraph.aggregateLargeUserAccumulatorBlobKeys()
    +
    +                /*
    +                * The following covers the case where partial accumulator results are
small, but
    +                * when aggregated, they become big. In this case, this happens at the
JobManager,
    +                * and this code is responsible for detecting it, storing the oversized
result in
    +                * the BlobCache, and informing the Client accordingly.
    +                * */
    +                
    +                val totalSize: Long = smallAccumulatorResults.asScala.map(_._2.getSizeInBytes).sum
    +                if (totalSize > AkkaUtils.getLargeAccumulatorThreshold(jobConfig))
{
    +                  // given that the client is going to do the final merging, we serialize
and
    +                  // store the accumulator objects, not only the content
    +                  val serializedSmallAccumulators = executionGraph.getSmallAccumulatorsSerialized
    +
    +                  // store the accumulators in the blobCache and get the keys.
    +                  val newBlobKeys = LargeAccumulatorHelper.storeSerializedAccumulatorsToBlobCache(
    +                    getBlobCacheServerAddress, serializedSmallAccumulators)
    +                  smallAccumulatorResults.clear()
    +
    +                  // and update the blobKeys to send to the client.
    +                  largeAccumulatorResults = executionGraph.
    +                    addLargeUserAccumulatorBlobKeys(largeAccumulatorResults, newBlobKeys)
    +
    +                } else {
    +                  // do nothing
    +                  java.util.Collections.emptyMap()
    +                }
    --- End diff --
    
    The else branch can probably be removed? You could also let the two if branches return
the value directly to the largeAccumulatorResults variable. Then it could be a `val` instead
of a `var`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message