flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/7] flink git commit: [FLINK-1442] [runtime] Minor code cleanups in MemoryArchivist
Date Thu, 05 Feb 2015 12:25:45 GMT
[FLINK-1442] [runtime] Minor code cleanups in MemoryArchivist


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56b7f85b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56b7f85b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56b7f85b

Branch: refs/heads/master
Commit: 56b7f85b4f6d522765df19a9710a098092ccde56
Parents: 8ae0dc2
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 4 15:07:02 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 5 12:17:15 2015 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/MemoryArchivist.scala    | 29 +++++++++-----------
 .../testingUtils/TestingMemoryArchivist.scala   |  8 +++---
 2 files changed, 17 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56b7f85b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 28e960d..88dc927 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -28,8 +28,9 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import scala.collection.mutable.LinkedHashMap
 import scala.ref.SoftReference
 
-class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with
-ActorLogging {
+class MemoryArchivist(private val max_entries: Int) extends Actor 
+      with ActorLogMessages with ActorLogging {
+
   /**
    * Map of execution graphs belonging to recently started jobs with the time stamp of the
last
    * received job event. The insert order is preserved through a LinkedHashMap.
@@ -37,6 +38,7 @@ ActorLogging {
   val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
 
   override def receiveWithLogMessages: Receive = {
+    
     /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) => {
       // wrap graph inside a soft reference
@@ -51,15 +53,15 @@ ActorLogging {
 
     case RequestJob(jobID) => {
       getGraph(jobID) match {
-        case graph: ExecutionGraph => sender ! JobFound(jobID, graph)
-        case _ => sender ! JobNotFound(jobID)
+        case Some(graph) => sender ! JobFound(jobID, graph)
+        case None => sender ! JobNotFound(jobID)
       }
     }
 
     case RequestJobStatus(jobID) => {
       getGraph(jobID) match {
-        case graph: ExecutionGraph => sender ! CurrentJobStatus(jobID, graph.getState)
-        case _ => sender ! JobNotFound(jobID)
+        case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState)
+        case None => sender ! JobNotFound(jobID)
       }
     }
   }
@@ -68,23 +70,18 @@ ActorLogging {
    * Gets all graphs that have not been garbage collected.
    * @return An iterable with all valid ExecutionGraphs
    */
-  def getAllGraphs() = graphs.values.flatMap(ref => ref.get match {
-    case Some(graph) => Seq(graph)
-    case _ => Seq()
-  })
+  protected def getAllGraphs(): Iterable[ExecutionGraph] = graphs.values.flatMap(_.get)
 
   /**
    * Gets a graph with a jobID if it has not been garbage collected.
    * @param jobID
    * @return ExecutionGraph or null
    */
-  def getGraph(jobID: JobID) = graphs.get(jobID) match {
-    case Some(softRef) => softRef.get match {
-      case Some(graph) => graph
-      case None => null
-    }
-    case None => null
+  protected def getGraph(jobID: JobID): Option[ExecutionGraph] = graphs.get(jobID) match
{
+    case Some(softRef) => softRef.get
+    case None => None
   }
+  
 
   /**
    * Remove old ExecutionGraphs belonging to a jobID

http://git-wip-us.apache.org/repos/asf/flink/blob/56b7f85b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index ca5f7e4..d3a3526 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -32,10 +32,10 @@ trait TestingMemoryArchivist extends ActorLogMessages {
   def receiveTestingMessages: Receive = {
     case RequestExecutionGraph(jobID) =>
       val executionGraph = getGraph(jobID)
-      if (executionGraph != null) {
-        sender ! ExecutionGraphFound(jobID, executionGraph)
-      } else {
-        sender ! ExecutionGraphNotFound(jobID)
+      
+      executionGraph match {
+        case Some(graph) => sender ! ExecutionGraphFound(jobID, graph)
+        case None => sender ! ExecutionGraphNotFound(jobID)
       }
   }
 }


Mime
View raw message