carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzcclp <...@git.apache.org>
Subject [GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Date Tue, 07 Nov 2017 03:23:39 GMT
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r149264766
  
    --- Diff: integration/spark2.2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
---
    @@ -0,0 +1,310 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.hive
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
    +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader,
GlobalTempViewManager, SessionCatalog}
    +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ScalarSubquery}
    +import org.apache.spark.sql.catalyst.optimizer.Optimizer
    +import org.apache.spark.sql.catalyst.parser.ParserInterface
    +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.SparkOptimizer
    +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation,
PreWriteCheck, ResolveSQLOnFile, _}
    +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy}
    +import org.apache.spark.sql.internal.{SQLConf, SessionState}
    +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
    +import org.apache.spark.sql.parser.CarbonSparkSqlParser
    +
    +import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
    +
    +/**
    + * This class will have carbon catalog and refresh the relation from cache if the carbontable
in
    + * carbon catalog is not same as cached carbon relation's carbon table
    + *
    + * @param externalCatalog
    + * @param globalTempViewManager
    + * @param sparkSession
    + * @param functionResourceLoader
    + * @param functionRegistry
    + * @param conf
    + * @param hadoopConf
    + */
    +class CarbonSessionCatalog(
    +    externalCatalog: HiveExternalCatalog,
    +    globalTempViewManager: GlobalTempViewManager,
    +    functionRegistry: FunctionRegistry,
    +    sparkSession: SparkSession,
    +    conf: SQLConf,
    +    hadoopConf: Configuration,
    +    parser: ParserInterface,
    +    functionResourceLoader: FunctionResourceLoader)
    +  extends HiveSessionCatalog(
    +    externalCatalog,
    +    globalTempViewManager,
    +    new HiveMetastoreCatalog(sparkSession),
    +    functionRegistry,
    +    conf,
    +    hadoopConf,
    +    parser,
    +    functionResourceLoader
    +  ) {
    +
    +  lazy val carbonEnv = {
    +    val env = new CarbonEnv
    +    env.init(sparkSession)
    +    env
    +  }
    +
    +
    +  private def refreshRelationFromCache(identifier: TableIdentifier,
    +      alias: Option[String],
    +      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
    +    var isRefreshed = false
    +    val storePath = CarbonEnv.getInstance(sparkSession).storePath
    +    carbonEnv.carbonMetastore.
    +      checkSchemasModifiedTimeAndReloadTables(storePath)
    +
    +    val tableMeta = carbonEnv.carbonMetastore
    +      .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
    +        carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
    +    if (tableMeta.isEmpty || (tableMeta.isDefined &&
    +                              tableMeta.get.carbonTable.getTableLastUpdatedTime !=
    +                              carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime))
{
    +      refreshTable(identifier)
    +      DataMapStoreManager.getInstance().
    +        clearDataMap(AbsoluteTableIdentifier.from(storePath,
    +          identifier.database.getOrElse("default"), identifier.table))
    +      isRefreshed = true
    +      logInfo(s"Schema changes have been detected for table: $identifier")
    +    }
    +    isRefreshed
    +  }
    +
    +  //  override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder
= {
    +  //    makeFunctionBuilder(funcName, Utils.classForName(className))
    +  //  }
    +  //
    +  //  /**
    +  //   * Construct a [[FunctionBuilder]] based on the provided class that represents
a function.
    +  //   */
    +  //  private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder
= {
    +  //    // When we instantiate hive UDF wrapper class, we may throw exception if the
input
    +  //    // expressions don't satisfy the hive UDF, such as type mismatch, input number
    +  //    // mismatch, etc. Here we catch the exception and throw AnalysisException instead.
    +  //    (children: Seq[Expression]) => {
    +  //      try {
    +  //        if (classOf[UDF].isAssignableFrom(clazz)) {
    +  //          val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children)
    +  //          udf.dataType // Force it to check input data types.
    +  //          udf
    +  //        } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
    +  //          val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName),
children)
    +  //          udf.dataType // Force it to check input data types.
    +  //          udf
    +  //        } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz))
{
    +  //          val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName),
children)
    +  //          udaf.dataType // Force it to check input data types.
    +  //          udaf
    +  //        } else if (classOf[UDAF].isAssignableFrom(clazz)) {
    +  //          val udaf = HiveUDAFFunction(
    +  //            name,
    +  //            new HiveFunctionWrapper(clazz.getName),
    +  //            children,
    +  //            isUDAFBridgeRequired = true)
    +  //          udaf.dataType  // Force it to check input data types.
    +  //          udaf
    +  //        } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
    +  //          val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName),
children)
    +  //          udtf.elementSchema // Force it to check input data types.
    +  //          udtf
    +  //        } else {
    +  //          throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'")
    +  //        }
    +  //      } catch {
    +  //        case ae: AnalysisException =>
    +  //          throw ae
    +  //        case NonFatal(e) =>
    +  //          val analysisException =
    +  //            new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}':
$e")
    +  //          analysisException.setStackTrace(e.getStackTrace)
    +  //          throw analysisException
    +  //      }
    +  //    }
    +  //  }
    +  //
    +  //  override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]):
Expression = {
    +  //    try {
    +  //      lookupFunction0(name, children)
    +  //    } catch {
    +  //      case NonFatal(_) =>
    +  //        // SPARK-16228 ExternalCatalog may recognize `double`-type only.
    +  //        val newChildren = children.map { child =>
    +  //          if (child.dataType.isInstanceOf[DecimalType]) Cast(child, DoubleType) else
child
    +  //        }
    +  //        lookupFunction0(name, newChildren)
    +  //    }
    +  //  }
    +  //
    +  //  private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]):
Expression = {
    +  //    val database = name.database.map(formatDatabaseName)
    +  //    val funcName = name.copy(database = database)
    +  //    Try(super.lookupFunction(funcName, children)) match {
    +  //      case Success(expr) => expr
    +  //      case Failure(error) =>
    +  //        if (functionRegistry.functionExists(funcName.unquotedString)) {
    +  //          // If the function actually exists in functionRegistry, it means that there
is an
    +  //          // error when we create the Expression using the given children.
    +  //          // We need to throw the original exception.
    +  //          throw error
    +  //        } else {
    +  //          // This function is not in functionRegistry, let's try to load it as a
Hive's
    +  //          // built-in function.
    +  //          // Hive is case insensitive.
    +  //          val functionName = funcName.unquotedString.toLowerCase(Locale.ROOT)
    +  //          if (!hiveFunctions.contains(functionName)) {
    +  //            failFunctionLookup(funcName)
    +  //          }
    +  //
    +  //          // TODO: Remove this fallback path once we implement the list of fallback
functions
    +  //          // defined below in hiveFunctions.
    +  //          val functionInfo = {
    +  //            try {
    +  //              Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse(
    +  //                failFunctionLookup(funcName))
    +  //            } catch {
    +  //              // If HiveFunctionRegistry.getFunctionInfo throws an exception,
    +  //              // we are failing to load a Hive builtin function, which means that
    +  //              // the given function is not a Hive builtin function.
    +  //              case NonFatal(e) => failFunctionLookup(funcName)
    +  //            }
    +  //          }
    +  //          val className = functionInfo.getFunctionClass.getName
    +  //          val functionIdentifier =
    +  //            FunctionIdentifier(functionName.toLowerCase(Locale.ROOT), database)
    +  //          val func = CatalogFunction(functionIdentifier, className, Nil)
    +  //          // Put this Hive built-in function to our function registry.
    +  //          registerFunction(func, ignoreIfExists = false)
    +  //          // Now, we need to create the Expression.
    +  //          functionRegistry.lookupFunction(functionName, children)
    +  //        }
    +  //    }
    +  //  }
    +  //
    +  //  // TODO Removes this method after implementing Spark native "histogram_numeric".
    +  //  override def functionExists(name: FunctionIdentifier): Boolean = {
    +  //    super.functionExists(name) || hiveFunctions.contains(name.funcName)
    +  //  }
    +  //
    +  //  /** List of functions we pass over to Hive. Note that over time this list should
go to 0. */
    +  //  // We have a list of Hive built-in functions that we do not support. So, we will
check
    +  //  // Hive's function registry and lazily load needed functions into our own function
registry.
    +  //  // List of functions we are explicitly not supporting are:
    +  //  // compute_stats, context_ngrams, create_union,
    +  //  // current_user, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or,
field,
    +  //  // in_file, index, matchpath, ngrams, noop, noopstreaming, noopwithmap,
    +  //  // noopwithmapstreaming, parse_url_tuple, reflect2, windowingtablefunction.
    +  //  // Note: don't forget to update SessionCatalog.isTemporaryFunction
    +  //  private val hiveFunctions = Seq(
    +  //    "histogram_numeric"
    +  //  )
    +}
    +
    +/**
    + * Session state implementation to override sql parser and adding strategies
    + *
    + * @param sparkSession
    + */
    +class CarbonSessionStateBuilder(sparkSession: SparkSession, parentState: Option[SessionState]
= None)
    +  extends HiveSessionStateBuilder(sparkSession, parentState) {
    +
    +  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
    +
    +  experimentalMethods.extraStrategies =
    +    Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
    +  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
    +
    +  /**
    +   * Internal catalog for managing table and database states.
    +   */
    +  override lazy val catalog : CarbonSessionCatalog= {
    --- End diff --
    
    white space before '='


---

Mime
View raw message