spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gatorsmile <...@git.apache.org>
Subject [GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...
Date Mon, 27 Mar 2017 05:32:38 GMT
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17433#discussion_r108093020
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
    @@ -179,88 +132,295 @@ private[sql] class SessionState(
       }
     }
     
    -
     private[sql] object SessionState {
    +  /**
    +   * Create a new [[SessionState]] for the given session.
    +   */
    +  def apply(session: SparkSession): SessionState = {
    +    new SessionStateBuilder(session).build
    +  }
    +
    +  def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): Configuration = {
    +    val newHadoopConf = new Configuration(hadoopConf)
    +    sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) newHadoopConf.set(k,
v) }
    +    newHadoopConf
    +  }
    +}
    +
    +/**
    + * Builder class that coordinates construction of a new [[SessionState]].
    + *
    + * The builder explicitly defines all components needed by the session state, and creates
a session
    + * state when `build` is called. Components should only be initialized once. This is
not a problem
    + * for most components as they are only used in the `build` function. However some components
    + * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & `sqlParser`) are
as dependencies
    + * for other components and are shared as a result. These components are defined as lazy
vals to
    + * make sure the component is created only once.
    + *
    + * A developer can modify the builder by providing custom versions of components, or
by using the
    + * hooks provided for the analyzer, optimizer & planner. There are some dependencies
between the
    + * components (they are documented per dependency), a developer should respect these
when making
    + * modifications in order to prevent initialization problems.
    + *
    + * A parent [[SessionState]] can be used to initialize the new [[SessionState]]. The
new session
    + * state will clone the parent sessions state's `conf`, `functionRegistry`, `experimentalMethods`
    + * and `catalog` fields. Note that the state is cloned when `build` is called, and not
before.
    + */
    +@Experimental
    +@InterfaceStability.Unstable
    +abstract class BaseSessionStateBuilder(
    +    val session: SparkSession,
    +    val parentState: Option[SessionState] = None) {
    +  type NewBuilder = (SparkSession, Option[SessionState]) => BaseSessionStateBuilder
     
    -  def apply(sparkSession: SparkSession): SessionState = {
    -    apply(sparkSession, new SQLConf)
    +  /**
    +   * Extract entries from `SparkConf` and put them in the `SQLConf`
    +   */
    +  protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
    +    sparkConf.getAll.foreach { case (k, v) =>
    +      sqlConf.setConfString(k, v)
    +    }
       }
     
    -  def apply(sparkSession: SparkSession, sqlConf: SQLConf): SessionState = {
    -    val sparkContext = sparkSession.sparkContext
    +  /**
    +   * SQL-specific key-value configurations.
    +   *
    +   * These either get cloned from a pre-existing instance or newly created. The conf
is always
    +   * merged with its [[SparkConf]].
    +   */
    +  protected lazy val conf: SQLConf = {
    +    val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
    +    mergeSparkConf(conf, session.sparkContext.conf)
    +    conf
    +  }
     
    -    // Automatically extract all entries and put them in our SQLConf
    -    mergeSparkConf(sqlConf, sparkContext.getConf)
    +  /**
    +   * Internal catalog managing functions registered by the user.
    +   *
    +   * This either gets cloned from a pre-existing version or cloned from the build-in
registry.
    +   */
    +  protected lazy val functionRegistry: FunctionRegistry = {
    +    parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
    +  }
     
    -    val functionRegistry = FunctionRegistry.builtin.clone()
    +  /**
    +   * Experimental methods that can be used to define custom optimization rules and custom
planning
    +   * strategies.
    +   *
    +   * This either gets cloned from a pre-existing version or newly created.
    +   */
    +  protected lazy val experimentalMethods: ExperimentalMethods = {
    +    parentState.map(_.experimentalMethods.clone()).getOrElse(new ExperimentalMethods)
    +  }
     
    -    val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
    +  /**
    +   * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
    +   *
    +   * Note: this depends on the `conf` field.
    +   */
    +  protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
     
    +  /**
    +   * Catalog for managing table and database states. If there is a pre-existing catalog,
the state
    +   * of that catalog (temp tables & current database) will be copied into the new
catalog.
    +   *
    +   * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields.
    +   */
    +  protected lazy val catalog: SessionCatalog = {
         val catalog = new SessionCatalog(
    -      sparkSession.sharedState.externalCatalog,
    -      sparkSession.sharedState.globalTempViewManager,
    +      session.sharedState.externalCatalog,
    +      session.sharedState.globalTempViewManager,
           functionRegistry,
    -      sqlConf,
    -      newHadoopConf(sparkContext.hadoopConfiguration, sqlConf),
    -      sqlParser)
    +      conf,
    +      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
    +      sqlParser,
    +      new SessionFunctionResourceLoader(session))
    +    parentState.foreach(_.catalog.copyStateTo(catalog))
    +    catalog
    +  }
     
    -    val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, sqlConf)
    +  /**
    +   * Logical query plan analyzer for resolving unresolved attributes and relations.
    +   *
    +   * Note: this depends on the `conf` and `catalog` field.
    +   */
    +  protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
    +    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
    +      new FindDataSourceTable(session) +:
    +      new ResolveSQLOnFile(session) +:
    +      customResolutionRules
    +
    +    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
    +      PreprocessTableCreation(session) +:
    +      PreprocessTableInsertion(conf) +:
    +      DataSourceAnalysis(conf) +:
    +      customPostHocResolutionRules
    +
    +    override val extendedCheckRules: Seq[LogicalPlan => Unit] =
    +      PreWriteCheck +:
    +      HiveOnlyCheck +:
    +      customCheckRules
    +  }
     
    -    val streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(sparkSession)
    +  /**
    +   * Custom resolution rules to add to the Analyzer. Prefer overriding this instead of
creating
    +   * your own Analyzer.
    +   *
    +   * Note that this may NOT depend on the `analyzer` function.
    +   */
    +  protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil
     
    -    val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(sparkSession,
plan)
    +  /**
    +   * Custom post resolution rules to add to the Analyzer. Prefer overriding this instead
of
    +   * creating your own Analyzer.
    +   *
    +   * Note that this may NOT depend on the `analyzer` function.
    +   */
    +  protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
    +
    +  /**
    +   * Custom check rules to add to the Analyzer. Prefer overriding this instead of creating
    +   * your own Analyzer.
    +   *
    +   * Note that this may NOT depend on the `analyzer` function.
    +   */
    +  protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil
    +
    +  /**
    +   * Logical query plan optimizer.
    +   *
    +   * Note: this depends on the `conf`, `catalog` and `experimentalMethods` fields.
    +   */
    +  protected def optimizer: Optimizer = {
    +    new SparkOptimizer(catalog, conf, experimentalMethods) {
    +      override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
    +        super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
    +    }
    +  }
    +
    +  /**
    +   * Custom operator optimization rules to add to the Optimizer. Prefer overriding this
instead
    +   * of creating your own Optimizer.
    +   *
    +   * Note that this may NOT depend on the `optimizer` function.
    +   */
    +  protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
     
    -    val sessionState = new SessionState(
    -      sparkContext,
    -      sparkSession.sharedState,
    -      sqlConf,
    -      new ExperimentalMethods,
    +  /**
    +   * Planner that converts optimized logical plans to physical plans.
    +   *
    +   * Note: this depends on the `conf` and `experimentalMethods` fields.
    +   */
    +  protected def planner: SparkPlanner = {
    +    new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
    +      override def extraPlanningStrategies: Seq[Strategy] =
    +        super.extraPlanningStrategies ++ customPlanningStrategies
    +    }
    +  }
    +
    +  /**
    +   * Custom strategies to add to the planner. Prefer overriding this instead of creating
    +   * your own Planner.
    +   *
    +   * Note that this may NOT depend on the `planner` function.
    +   */
    +  protected def customPlanningStrategies: Seq[Strategy] = Nil
    +
    +  /**
    +   * Create a query execution object.
    +   */
    +  protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
    +    new QueryExecution(session, plan)
    +  }
    +
    +  /**
    +   * Interface to start and stop streaming queries.
    +   */
    +  protected def streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(session)
    +
    +  /**
    +   * Function that produces a new instance of the SessionStateBuilder. This is used by
the
    +   * [[SessionState]]'s clone functionality. Make sure to override this when implementing
your own
    +   * [[SessionStateBuilder]].
    +   */
    +  protected def newBuilder: NewBuilder
    --- End diff --
    
    Nit: `newBuilder` -> `newBuilder()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message