spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vanzin <...@git.apache.org>
Subject [GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Date Mon, 28 Sep 2015 21:13:30 GMT
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8744#discussion_r40606368
  
    --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
---
    @@ -0,0 +1,1048 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.net.{ConnectException, URI}
    +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger}
    +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent,
TimelinePutResponse}
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +import org.apache.hadoop.yarn.client.api.TimelineClient
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding}
    +import org.apache.spark.util.{SystemClock, Utils}
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * A Yarn Extension Service to post lifecycle events to a registered
    + * YARN Timeline Server.
    + */
    +private[spark] class YarnHistoryService extends YarnExtensionService with Logging {
    +
    +  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
    +
    +  /** Simple state model implemented in an atomic integer */
    +  private val _serviceState = new AtomicInteger(CreatedState)
    +
    +  def serviceState: Int = {
    +    _serviceState.get()
    +  }
    +  def enterState(state: Int): Int = {
    +    logDebug(s"Entering state $state from $serviceState")
    +    _serviceState.getAndSet(state)
    +  }
    +
    +  /**
    +   * Spark context; valid once started
    +   */
    +  private var sparkContext: SparkContext = _
    +
    +  /** YARN configuration from the spark context */
    +  private var config: YarnConfiguration = _
    +
    +  /** application ID. */
    +  private var _applicationId: ApplicationId = _
    +
    +  /** attempt ID this will be null if the service is started in yarn-client mode */
    +  private var _attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** YARN timeline client */
    +  private var _timelineClient: Option[TimelineClient] = None
    +
    +  /** registered event listener */
    +  private var listener: Option[YarnEventListener] = None
    +
    +  /** Application name  from the spark start event */
    +  private var applicationName: String = _
    +
    +  /** Application ID*/
    +  private var sparkApplicationId: Option[String] = None
    +
    +  /** Optional Attempt ID from the spark start event */
    +  private var sparkApplicationAttemptId: Option[String] = None
    +
    +  /** user name as derived from `SPARK_USER` env var or `UGI` */
    +  private var userName = Utils.getCurrentUserName()
    +
    +  /** Clock for recording time */
    +  private val clock = new SystemClock()
    +
    +  /**
    +   * Start time of the application, as received in the start event.
    +   */
    +  private var startTime: Long = _
    +
    +  /**
    +   * Start time of the application, as received in the end event.
    +   */
    +  private var endTime: Long = _
    +
    +  /** number of events to batch up before posting*/
    +  private var _batchSize = DEFAULT_BATCH_SIZE
    +
    +  /** queue of entities to asynchronously post, plus the number of events in each entry
*/
    +  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
    +
    +  /** limit on the total number of events permitted */
    +  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
    +
    +  /**
    +   * List of events which will be pulled into a timeline
    +   * entity when created
    +   */
    +  private var pendingEvents = new mutable.LinkedList[TimelineEvent]()
    +
    +  private var applicationStartEvent: Option[SparkListenerApplicationStart] = None
    +  private var applicationEndEvent: Option[SparkListenerApplicationEnd] = None
    +
    +  /** Has a start event been processed? */
    +  private val appStartEventProcessed = new AtomicBoolean(false)
    +
    +  /* has the application event event been processed */
    +  private val appEndEventProcessed = new AtomicBoolean(false)
    +
    +  /** counter of events processed -that is have been through handleEvent()*/
    +  private val _eventsProcessed = new AtomicLong(0)
    +
    +  /** counter of events queued. */
    +  private val _eventsQueued = new AtomicLong(0)
    +
    +  private val _entityPostAttempts = new AtomicLong(0)
    +  private val _entityPostSuccesses = new AtomicLong(0)
    +  /** how many entity postings failed? */
    +  private val _entityPostFailures = new AtomicLong(0)
    +  private val _eventsDropped = new AtomicLong(0)
    +
    +  /** how many flushes have taken place? */
    +  private val flushCount = new AtomicLong(0)
    +
    +  /** Event handler */
    +  private var eventHandlingThread: Option[Thread] = None
    +
    +  /**
    +   * Flag to indicate the thread is stopped; events aren't being
    +   * processed.
    +   */
    +  private val stopped = new AtomicBoolean(true)
    +
    +  /**
    +   * Boolean to track whether a thread is active or not, for tests to
    +   * monitor and see if the thread has completed.
    +   */
    +  private val postThreadActive = new AtomicBoolean(false)
    +
    +  /** How long to wait for shutdown before giving up */
    +  private var shutdownWaitTime = 0L
    +
    +  /**
    +   * What is the initial and incrementing interval for POST retries?
    +   */
    +  private var retryInterval = 0L
    +
    +  /** Domain ID for entities: may be null */
    +  private var domainId: Option[String] = None
    +
    +  /** URI to timeline web application -valid after `serviceStart()` */
    +  private var _timelineWebappAddress: URI = _
    +
    +  /**
    +   * Create a timeline client and start it. This does not update the
    +   * `timelineClient` field, though it does verify that the field
    +   * is unset.
    +   *
    +   * The method is private to the package so that tests can access it, which
    +   * some of the mock tests do to override the timeline client creation.
    +   * @return the timeline client
    +   */
    +  private[yarn] def createTimelineClient(): TimelineClient = {
    +    require(_timelineClient.isEmpty, "timeline client already set")
    +    YarnTimelineUtils.createTimelineClient(sparkContext)
    +  }
    +
    +  /**
    +   * Get the timeline client.
    +   * @return the client
    +   * @throws Exception if the timeline client is not currently running
    +   */
    +  def timelineClient: TimelineClient = {
    +    require(_timelineClient.isDefined)
    +    _timelineClient.get
    +  }
    +
    +  /**
    +   * Get the total number of events dropped due to the queue of
    +   * outstanding posts being too long
    +   * @return counter of events processed
    +   */
    +
    +  def eventsDropped: Long = {
    +    _eventsDropped.get()
    +  }
    +
    +  /**
    +   * Get the total number of processed events, those handled in the back-end thread without
    +   * being rejected
    +   * @return counter of events processed
    +   */
    +  def eventsProcessed: Long = {
    +    _eventsProcessed.get
    +  }
    +
    +  /**
    +   * Get the total number of events queued
    +   * @return the total event count
    +   */
    +  def eventsQueued: Long = {
    +    _eventsQueued.get
    +  }
    +
    +  /**
    +   * Get the current size of the queue
    +   * @return the current queue length
    +   */
    +  def getQueueSize: Int = {
    +    _entityQueue.size()
    +  }
    +
    +  /**
    +   * Get the current batch size
    +   * @return the batch size
    +   */
    +  def batchSize: Int = {
    +    _batchSize
    +  }
    +
    +  /**
    +   * Query the counter of attempts to post events
    +   * @return
    +   */
    +  def postAttempts: Long = _entityPostAttempts.get()
    +
    +  /**
    +   * Get the total number of failed post operations
    +   * @return counter of timeline post operations which failed
    +   */
    +  def postFailures: Long = {
    +    _entityPostFailures.get
    +  }
    +
    +  /**
    +   * Query the counter of post successes
    +   * @return the number of successful posts
    +   */
    +  def postSuccesses: Long = _entityPostSuccesses.get()
    +
    +  /**
    +   * is the asynchronous posting thread active?
    +   * @return true if the post thread has started; false if it has not yet/ever started,
or
    +   *         if it has finished.
    +   */
    +  def isPostThreadActive: Boolean = {
    +    postThreadActive.get
    +  }
    +
    +  /**
    +   * The YARN application ID of this history service
    +   * @return the application ID provided when the service started
    +   */
    +  def applicationId: ApplicationId =  _applicationId
    +
    +  /**
    +   * The YARN attempt ID of this history service
    +   * @return the attempt ID provided when the service started
    +   */
    +  def attemptId: Option[ApplicationAttemptId] =  _attemptId
    +
    +  /**
    +   * Reset the timeline client
    +   * <p>
    +   * 1. Stop the timeline client service if running.
    +   * 2. set the `timelineClient` field to `None`
    +   */
    +  def stopTimelineClient(): Unit = {
    +    _timelineClient.foreach(_.stop())
    +    _timelineClient = None
    +  }
    +
    +  /**
    +   * Create the timeline domain.
    +   *
    +   * A Timeline Domain is a uniquely identified 'namespace' for accessing parts of the
timeline.
    +   * Security levels are are managed at the domain level, so one is created if the
    +   * spark acls are enabled. Full access is then granted to the current user,
    +   * all users in the configuration options `"spark.modify.acls"` and `"spark.admin.acls"`;
    +   * read access to those users and those listed in `"spark.ui.view.acls"`
    +   *
    +   * @return an optional domain string. If `None`, then no domain was created.
    +   */
    +  private def createTimelineDomain(): Option[String] = {
    +    val sparkConf = sparkContext.getConf
    +    val aclsOn = sparkConf.getBoolean("spark.ui.acls.enable",
    +        sparkConf.getBoolean("spark.acls.enable", false))
    +    if (!aclsOn) {
    +      logDebug("ACLs are disabled; not creating the timeline domain")
    +      return None
    +    }
    +    val predefDomain = sparkConf.getOption(TIMELINE_DOMAIN)
    +    if (predefDomain.isDefined) {
    +      logDebug(s"Using predefined domain $predefDomain")
    +      return predefDomain
    +    }
    +    val current = UserGroupInformation.getCurrentUser.getShortUserName
    +    val adminAcls  = stringToSet(sparkConf.get("spark.admin.acls", ""))
    +    val viewAcls = stringToSet(sparkConf.get("spark.ui.view.acls", ""))
    +    val modifyAcls = stringToSet(sparkConf.get("spark.modify.acls", ""))
    +
    +    val readers = (Seq(current) ++ adminAcls ++ modifyAcls ++ viewAcls).mkString(" ")
    +    val writers = (Seq(current) ++ adminAcls ++ modifyAcls).mkString(" ")
    +    var domain = DOMAIN_ID_PREFIX + _applicationId
    +    logInfo(s"Creating domain $domain with readers: $readers and writers: $writers")
    +
    +    // create the timeline domain with the reader and writer permissions
    +    val timelineDomain = new TimelineDomain()
    +    timelineDomain.setId(domain)
    +    timelineDomain.setReaders(readers)
    +    timelineDomain.setWriters(writers)
    +    try {
    +      timelineClient.putDomain(timelineDomain)
    +      Some(domain)
    +    } catch {
    +      case e: Exception => {
    +        logError(s"cannot create the domain $domain", e)
    +        // fallback to default
    +        None
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Start the service, calling the service's `init()` and `start()` actions in the
    +   * correct order
    +   * @param binding binding to the spark application and YARN
    +   */
    +  override def start(binding: YarnExtensionServiceBinding): Unit = {
    +    val oldstate = enterState(StartedState)
    +    if (oldstate != CreatedState) {
    +      // state model violation
    +      _serviceState.set(oldstate)
    +      throw new IllegalArgumentException(s"Cannot start the service from state $oldstate")
    +    }
    +    val context = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    require(context != null, "Null context parameter")
    +    bindToYarnApplication(appId, attemptId)
    +    this.sparkContext = context
    +
    +    this.config = new YarnConfiguration(context.hadoopConfiguration)
    +
    +    val sparkConf = sparkContext.conf
    +
    +    // work out the attempt ID from the YARN attempt ID. No attempt, assume "1".
    +    // this is assumed by the AM, which uses it when creating a path to an attempt
    +    val attempt1 = attemptId match {
    +      case Some(attempt) => attempt.getAttemptId.toString
    +      case None => CLIENT_BACKEND_ATTEMPT_ID
    +    }
    +    setContextAppAndAttemptInfo(Some(appId.toString), Some(attempt1))
    +    _batchSize = sparkConf.getInt(BATCH_SIZE, _batchSize)
    +    _postQueueLimit = sparkConf.getInt(POST_QUEUE_LIMIT, _postQueueLimit)
    +    retryInterval = 1000 * sparkConf.getTimeAsSeconds(POST_RETRY_INTERVAL,
    +      DEFAULT_POST_RETRY_INTERVAL)
    +    shutdownWaitTime = 1000 * sparkConf.getTimeAsSeconds(SHUTDOWN_WAIT_TIME,
    +      DEFAULT_SHUTDOWN_WAIT_TIME)
    +
    +
    +    if (timelineServiceEnabled) {
    +      _timelineWebappAddress = getTimelineEndpoint(config)
    +
    +      logInfo(s"Starting $this")
    +      logInfo(s"Spark events will be published to the Timeline service at ${_timelineWebappAddress}")
    +      _timelineClient = Some(createTimelineClient())
    +      domainId = createTimelineDomain()
    +      // declare that the processing is started
    +      stopped.set(false)
    +      eventHandlingThread = Some(new Thread(new Dequeue(), "HistoryEventHandlingThread"))
    +      eventHandlingThread.get.start()
    +    } else {
    +      logInfo("Timeline service is disabled")
    +    }
    +    if (registerListener()) {
    +      logInfo(s"History Service listening for events: $this")
    +    } else {
    +      logInfo(s"History Service is not listening for events: $this")
    +    }
    +  }
    +
    +  /**
    +   * Check the service configuration to see if the timeline service is enabled
    +   * @return true if `YarnConfiguration.TIMELINE_SERVICE_ENABLED` is set.
    +   */
    +  def timelineServiceEnabled: Boolean = {
    +    YarnTimelineUtils.timelineServiceEnabled(config)
    +  }
    +
    +  /**
    +   * Return a summary of the service state to help diagnose problems
    +   * during test runs, possibly even production
    +   * @return a summary of the current service state
    +   */
    +  override def toString(): String = {
    +    s"YarnHistoryService for application $applicationId attempt $attemptId;" +
    +    s" state=$serviceState;" +
    +    s" endpoint=${_timelineWebappAddress};" +
    +    s" bonded to ATS=$bondedToATS;" +
    +    s" listening=$listening;" +
    +    s" batchSize=$batchSize;" +
    +    s" flush count=$getFlushCount;" +
    +    s" total number queued=$eventsQueued, processed=$eventsProcessed;" +
    +    s" attempted entity posts=$postAttempts" +
    +    s" successful entity posts=$postSuccesses" +
    +    s" failed entity posts=$postFailures;" +
    +    s" events dropped=$eventsDropped;" +
    +    s" app start event received=$appStartEventProcessed;" +
    +    s" app end event received=$appEndEventProcessed;"
    +  }
    +
    +  /**
    +   * Is the service listening to events from the spark context?
    +   * @return true if it has registered as a listener
    +   */
    +  def listening: Boolean = {
    +    listener.isDefined
    +  }
    +
    +  /**
    +   * Is the service hooked up to an ATS server. This does not
    +   * check the validity of the link, only whether or not the service
    +   * has been set up to talk to ATS.
    +   * @return true if the service has a timeline client
    +   */
    +  def bondedToATS: Boolean = {
    +    _timelineClient.isDefined
    +  }
    +
    +  /**
    +   * Set the YARN binding information. This is called during startup. It is private
    +   * to the package so that tests may update this data
    +   * @param appId YARN application ID
    +   * @param attemptId optional attempt ID
    +   */
    +  private[yarn] def bindToYarnApplication(appId: ApplicationId,
    +      attemptId: Option[ApplicationAttemptId]): Unit = {
    +    require(appId != null, "Null appId parameter")
    +    _applicationId = appId
    +    _attemptId = attemptId
    +  }
    +
    +  /**
    +   * Set the "spark" application and attempt information -the information
    +   * provided in the start event. The attempt ID here may be `None`; even
    +   * if set it may only be unique amongst the attempts of this application.
    +   * That is: not unique enough to be used as the entity ID
    +   * @param appId application ID
    +   * @param attemptId attempt ID
    +   */
    +  private def setContextAppAndAttemptInfo(appId: Option[String],
    --- End diff --
    
    I'm confused as to why you need both `bindToYarnApplication` and this. In what situation
would `_applicationId` be different than `sparkApplicationId`? If that situation exists, it
sounds like a bug to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message