carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvramana <...@git.apache.org>
Subject [GitHub] carbondata pull request #1616: [CARBONDATA-1851][WIP] Code refactored for be...
Date Thu, 07 Dec 2017 09:42:58 GMT
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1616#discussion_r155471634
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---
    @@ -151,58 +136,76 @@ object CarbonSession {
               return session
             }
     
    -        // No active nor global default session. Create a new one.
    -        val sparkContext = userSuppliedContext.getOrElse {
    -          // set app name if not given
    -          val randomAppName = java.util.UUID.randomUUID().toString
    -          val sparkConf = new SparkConf()
    -          options.foreach { case (k, v) => sparkConf.set(k, v) }
    -          if (!sparkConf.contains("spark.app.name")) {
    -            sparkConf.setAppName(randomAppName)
    +        // Global synchronization so we will only set the default session once.
    +        SparkSession.synchronized {
    +          // If the current thread does not have an active session, get it from the global
session.
    +          session = SparkSession.getDefaultSession match {
    +            case Some(sparkSession: CarbonSession) =>
    +              if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped)
{
    +                options
    +                  .foreach { case (k, v) => sparkSession.sessionState.conf.setConfString(k,
v) }
    +                sparkSession
    +              } else {
    +                null
    +              }
    +            case _ => null
               }
    -          val sc = SparkContext.getOrCreate(sparkConf)
    -          // maybe this is an existing SparkContext, update its SparkConf which maybe
used
    -          // by SparkSession
    -          options.foreach { case (k, v) => sc.conf.set(k, v) }
    -          if (!sc.conf.contains("spark.app.name")) {
    -            sc.conf.setAppName(randomAppName)
    +          if (session ne null) {
    +            return session
               }
    -          sc
    -        }
     
    -        session = new CarbonSession(sparkContext)
    -        val carbonProperties = CarbonProperties.getInstance()
    -        if (storePath != null) {
    -          carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
    -          // In case if it is in carbon.properties for backward compatible
    -        } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)
== null) {
    -          carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
    -            session.sessionState.conf.warehousePath)
    -        }
    -        options.foreach { case (k, v) => session.sessionState.conf.setConfString(k,
v) }
    -        SparkSession.setDefaultSession(session)
    -        try {
    -          CommonUtil.cleanInProgressSegments(
    -            carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION), sparkContext)
    -        } catch {
    -          case e: Throwable =>
    -            // catch all exceptions to avoid CarbonSession initialization failure
    -          LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    -            .error(e, "Failed to clean in progress segments")
    -        }
    -        // Register a successfully instantiated context to the singleton. This should
be at the
    -        // end of the class definition so that the singleton is updated only if there
is no
    -        // exception in the construction of the instance.
    -        sparkContext.addSparkListener(new SparkListener {
    -          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd):
Unit = {
    -            SparkSession.setDefaultSession(null)
    -            SparkSession.sqlListener.set(null)
    +          // No active nor global default session. Create a new one.
    +          val sparkContext = userSuppliedContext.getOrElse {
    +            // set app name if not given
    +            val randomAppName = java.util.UUID.randomUUID().toString
    +            val sparkConf = new SparkConf()
    +            options.foreach { case (k, v) => sparkConf.set(k, v) }
    +            if (!sparkConf.contains("spark.app.name")) {
    +              sparkConf.setAppName(randomAppName)
    +            }
    +            val sc = SparkContext.getOrCreate(sparkConf)
    +            // maybe this is an existing SparkContext, update its SparkConf which maybe
used
    +            // by SparkSession
    +            options.foreach { case (k, v) => sc.conf.set(k, v) }
    +            if (!sc.conf.contains("spark.app.name")) {
    +              sc.conf.setAppName(randomAppName)
    +            }
    +            sc
               }
    -        })
    -        session.streams.addListener(new CarbonStreamingQueryListener(session))
    -      }
     
    -      session
    +          session = new CarbonSession(sparkContext)
    --- End diff --
    
    Why these changes


---

Mime
View raw message