flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1019) Rework RPC service
Date Fri, 05 Dec 2014 19:47:13 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235987#comment-14235987
] 

ASF GitHub Bot commented on FLINK-1019:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r21395124
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
    @@ -0,0 +1,518 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager
    +
    +import java.io.File
    +import java.net.{InetSocketAddress}
    +import java.util.concurrent.TimeUnit
    +
    +import akka.actor._
    +import akka.pattern.Patterns
    +import akka.pattern.{ask, pipe}
    +import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
    +import org.apache.flink.core.io.InputSplitAssigner
    +import org.apache.flink.runtime.blob.BlobServer
    +import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph}
    +import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
    +import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
    +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
    +import org.apache.flink.runtime.taskmanager.TaskManager
    +import org.apache.flink.runtime.{JobException, ActorLogMessages}
    +import org.apache.flink.runtime.akka.AkkaUtils
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
    +import org.apache.flink.runtime.instance.{InstanceManager}
    +import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
    +import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
    +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
    +import org.apache.flink.runtime.messages.JobManagerMessages._
    +import org.apache.flink.runtime.messages.RegistrationMessages._
    +import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, Heartbeat}
    +import org.apache.flink.runtime.profiling.ProfilingUtils
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.convert.WrapAsScala
    +import scala.concurrent.{Future}
    +import scala.concurrent.duration._
    +
    +class JobManager(val configuration: Configuration) extends
    +Actor with ActorLogMessages with ActorLogging with WrapAsScala {
    +  import context._
    +  implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
    +    ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
    +
    +  Execution.timeout = timeout;
    +
    +  log.info("Starting job manager.")
    +
    +  val (archiveCount,
    +    profiling,
    +    cleanupInterval,
    +    defaultExecutionRetries,
    +    delayBetweenRetries) = JobManager.parseConfiguration(configuration)
    +
    +  // Props for the profiler actor
    +  def profilerProps: Props = Props(classOf[JobManagerProfiler])
    +
    +  // Props for the archive actor
    +  def archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
    +
    +  val profiler = profiling match {
    +    case true => Some(context.actorOf(profilerProps, JobManager.PROFILER_NAME))
    +    case false => None
    +  }
    +
    +  val archive = context.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
    +
    +  val accumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
    +  val instanceManager = new InstanceManager()
    +  val scheduler = new FlinkScheduler()
    +  val libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), cleanupInterval)
    +
    +  // List of current jobs running
    +  val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
    +
    +  // Map of actors which want to be notified once a specific job terminates
    +  val finalJobStatusListener = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
    +
    +  instanceManager.addInstanceListener(scheduler)
    +
    +  log.info(s"Started job manager. Waiting for incoming messages.")
    +
    +  override def postStop(): Unit = {
    +    log.info(s"Stopping job manager ${self.path}.")
    +    instanceManager.shutdown()
    +    scheduler.shutdown()
    +    libraryCacheManager.shutdown()
    +  }
    +
    +  override def receiveWithLogMessages: Receive = {
    +    case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) =>
{
    +      val taskManager = sender()
    +      val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
    +        hardwareInformation, numberOfSlots)
    +
    +      // to be notified when the taskManager is no longer reachable
    +//      context.watch(taskManager);
    +
    +      taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
    +    }
    +
    +    case RequestNumberRegisteredTaskManager => {
    +      sender() ! instanceManager.getNumberOfRegisteredTaskManagers
    +    }
    +
    +    case RequestTotalNumberOfSlots => {
    +      sender() ! instanceManager.getTotalNumberOfSlots
    +    }
    +
    +    case SubmitJob(jobGraph, listenToEvents, detach) => {
    +      try {
    +        if (jobGraph == null) {
    +          sender() ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph
must not be" +
    +            " null."))
    +        } else {
    +
    +          log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}}).")
    +
    +          // Create the user code class loader
    +          libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
    +
    +          val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(),
    +            (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
    +              jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys), JobInfo(sender(),
    +              System.currentTimeMillis())))
    +
    +          val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){
    +            jobGraph.getNumberOfExecutionRetries
    +          }else{
    +            defaultExecutionRetries
    +          }
    +
    +          executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
    +          executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
    +
    +          val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
    +
    +          if (userCodeLoader == null) {
    +            throw new JobException("The user code class loader could not be initialized.")
    +          }
    +
    +          log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph
    +            .getName}).")
    +
    +          for (vertex <- jobGraph.getVertices) {
    +            val executableClass = vertex.getInvokableClassName
    +            if (executableClass == null || executableClass.length == 0) {
    +              throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName})
has no " +
    +                s"invokable class.")
    +            }
    +
    +            vertex.initializeOnMaster(userCodeLoader)
    +          }
    +
    +          // topological sorting of the job vertices
    +          val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources
    +
    +          log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph
    +            .getJobID} (${jobGraph.getName}).")
    +
    +          executionGraph.attachJobGraph(sortedTopology)
    +
    +          log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID}
" +
    +            s"(${jobGraph.getName}).")
    +
    +          executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
    +
    +          // get notified about job status changes
    +          executionGraph.registerJobStatusListener(self)
    +
    +          if(listenToEvents){
    +            // the sender will be notified about state changes
    +            executionGraph.registerExecutionListener(sender())
    +            executionGraph.registerJobStatusListener(sender())
    +          }
    +
    +          jobInfo.detach = detach
    +
    +          log.info(s"Scheduling job ${jobGraph.getName}.")
    +
    +          executionGraph.scheduleForExecution(scheduler)
    +
    +          sender() ! SubmissionSuccess(jobGraph.getJobID)
    +        }
    +      } catch {
    +        case t: Throwable =>
    +          log.error(t, "Job submission failed.")
    +
    +          currentJobs.get(jobGraph.getJobID) match {
    +            case Some((executionGraph, jobInfo)) =>
    +              executionGraph.fail(t)
    +
    +              // don't send the client the final job status because we already send him
    +              // SubmissionFailure
    +              jobInfo.detach = true
    +
    +              val status = Patterns.ask(self, RequestFinalJobStatus(jobGraph.getJobID),
10 second)
    +              status.onFailure{
    +                case _: Throwable => self ! JobStatusChanged(executionGraph.getJobID,
    +                  JobStatus.FAILED, System.currentTimeMillis(),
    +                  s"Cleanup job ${jobGraph.getJobID}.")
    +              }
    +            case None =>
    +              libraryCacheManager.unregisterJob(jobGraph.getJobID)
    +              currentJobs.remove(jobGraph.getJobID)
    +
    +          }
    +
    +          sender() ! SubmissionFailure(jobGraph.getJobID, t)
    +      }
    +    }
    +
    +    case CancelJob(jobID) => {
    +      log.info(s"Trying to cancel job with ID ${jobID}.")
    +
    +      currentJobs.get(jobID) match {
    +        case Some((executionGraph, _)) =>
    +          Future {
    +            executionGraph.cancel()
    +          }
    +          sender() ! CancellationSuccess(jobID)
    +        case None =>
    +          log.info(s"No job found with ID ${jobID}.")
    +          sender() ! CancellationFailure(jobID, new IllegalArgumentException(s"No job
found with " +
    +            s"ID ${jobID}."))
    +      }
    +    }
    +
    +    case UpdateTaskExecutionState(taskExecutionState) => {
    +      if(taskExecutionState == null){
    +        sender() ! false
    +      }else {
    +        currentJobs.get(taskExecutionState.getJobID) match {
    +          case Some((executionGraph, _)) =>
    +            sender() ! executionGraph.updateState(taskExecutionState)
    +          case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
    +            .getJobID} to change state to ${taskExecutionState.getExecutionState}.")
    +            sender() ! false
    +        }
    +      }
    +    }
    +
    +    case RequestNextInputSplit(jobID, vertexID) => {
    +      val nextInputSplit = currentJobs.get(jobID) match {
    +        case Some((executionGraph,_)) => executionGraph.getJobVertex(vertexID) match
{
    +          case vertex: ExecutionJobVertex => vertex.getSplitAssigner match {
    +            case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(null)
    --- End diff --
    
    When passing a `null` hostname here, input split localization is impossible.


> Rework RPC service
> ------------------
>
>                 Key: FLINK-1019
>                 URL: https://issues.apache.org/jira/browse/FLINK-1019
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime
>            Reporter: Ufuk Celebi
>
> There is work going on to improve the RPC service by using [Akka|akka.io]. I couldn't
find a issue for it.
> Could one of the two people working on it ([~StephanEwen] and [~asteriosk]) please give
an overview of the changes and a status update?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message