carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sounakr <...@git.apache.org>
Subject [GitHub] carbondata pull request #1469: [CARBONDATA-1552][Spark-2.2 Integration] Spar...
Date Mon, 27 Nov 2017 06:20:04 GMT
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153113370
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala
---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.reflect.runtime._
    +import scala.reflect.runtime.universe._
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.catalog.CatalogTable
    +import org.apache.spark.sql.catalyst.parser.AstBuilder
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.internal.{SessionState, SQLConf}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +import org.apache.spark.util.Utils
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +
    +/**
    + * Reflection APIs
    + */
    +
    +object CarbonClassReflectionUtils {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  /**
    +   * Returns the field val from a object through reflection.
    +   * @param name - name of the field being retrieved.
    +   * @param obj - Object from which the field has to be retrieved.
    +   * @tparam T
    +   * @return
    +   */
    +  def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = {
    +    val im = rm.reflect(obj)
    +
    +    im.symbol.typeSignature.members.find(
    +      _.name.toString.equals(name)).map(
    +      l => im.reflectField(l.asTerm).get
    +        // .asInstanceOf[LogicalPlan]
    +    ).getOrElse(null)
    +  }
    +
    +  def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = {
    +    val hasField : Boolean = if (typeOf[T].members.filter(!_.isMethod).toList.contains(name))
{
    +      true
    +    } else {
    +      false
    +    }
    +    hasField
    +  }
    +
    +  def getUnresolvedRelation(tableIdentifier: TableIdentifier,
    +      tableAlias: Option[String] = None): UnresolvedRelation = {
    +
    +    val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation")
    +    try {
    +      // For 2.1
    +      clazz.getDeclaredField("alias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val unresolvedrelation = ctor
    +        .newInstance(tableIdentifier,
    +          Some(tableAlias.getOrElse(""))).asInstanceOf[UnresolvedRelation]
    +      unresolvedrelation
    +    } catch {
    +      case ce: NoSuchFieldException =>
    +        // For Spark-2.2
    +        val ctor = clazz.getConstructors.head
    +        ctor.setAccessible(true)
    +        val unresolvedrelation = ctor
    +          .newInstance(tableIdentifier).asInstanceOf[UnresolvedRelation]
    +        unresolvedrelation
    +    }
    +  }
    +
    +  def getSubqueryAlias(sparkSession: SparkSession, alias: Option[String],
    +      relation: LogicalPlan,
    +      view: Option[TableIdentifier]): SubqueryAlias = {
    +    if (sparkSession.version.contains("2.1")) {
    +      // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +      //   Project(projList, relation), Option(table.tableIdentifier))
    +      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val subqueryAlias = ctor
    +        .newInstance(alias.getOrElse(""), relation, Option(view)).asInstanceOf[SubqueryAlias]
    +      subqueryAlias
    +    } else if (sparkSession.version.contains("2.2")) {
    +      // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +      //  Project(projList, relation))
    +      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val subqueryAlias = ctor
    +        .newInstance(alias.getOrElse(""), relation).asInstanceOf[SubqueryAlias]
    +      subqueryAlias
    +    } else {
    +      throw new UnsupportedOperationException("Unsupported Spark version")
    +    }
    +  }
    +
    +  def getOverWrite[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean = {
    +    var overwriteboolean: Boolean = false
    +    val im = rm.reflect(obj)
    +    for (l <- im.symbol.typeSignature.members.filter(_.name.toString.contains("enabled")))
{
    +      overwriteboolean = im.reflectField(l.asTerm).get.asInstanceOf[Boolean]
    +    }
    +    overwriteboolean
    +  }
    +
    +  def getOverWriteOption[T: TypeTag : reflect.ClassTag](name: String, obj: T): Boolean
= {
    +    var overwriteboolean: Boolean = false
    +    val im = rm.reflect(obj)
    +    for (m <- typeOf[T].members.filter(!_.isMethod)) {
    +      if (m.toString.contains("overwrite")) {
    +        val typ = m.typeSignature
    +        if (typ.toString.contains("Boolean")) {
    +          // Spark2.2
    +          overwriteboolean = im.reflectField(m.asTerm).get.asInstanceOf[Boolean]
    +        } else {
    +          overwriteboolean = getOverWrite("enabled", im.reflectField(m.asTerm).get)
    +        }
    +      }
    +    }
    +    overwriteboolean
    +  }
    +
    +  def getFieldOfCatalogTable[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any
= {
    +    val im = rm.reflect(obj)
    +    val sym = im.symbol.typeSignature.member(TermName(name))
    +    val tableMeta = im.reflectMethod(sym.asMethod).apply()
    +    tableMeta
    +  }
    +
    +  def getAstBuilder(conf: SQLConf,
    +      sqlParser: CarbonSpark2SqlParser,
    +      sparkSession: SparkSession): AstBuilder = {
    +    if (sparkSession.version.contains("2.1")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else if (sparkSession.version.contains("2.2")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, sqlParser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else {
    +      throw new UnsupportedOperationException("Spark version not supported")
    +    }
    +  }
    +
    +  def getSessionState(sparkContext: SparkContext, carbonSession: CarbonSession): SessionState
= {
    +    if (sparkContext.version.contains("2.1")) {
    --- End diff --
    
    Modified this to euals instead of contains. Also specifying the actual version i.e. 2.1.0
or 2.2.0


---

Mime
View raw message