spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-20926][SQL] Removing exposures to guava library caused by directly accessing SessionCatalog's tableRelationCache
Date Mon, 05 Jun 2017 20:19:07 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 acd4481ec -> 1388fdd70


[SPARK-20926][SQL] Removing exposures to guava library caused by directly accessing  SessionCatalog's
tableRelationCache

There could be test failures because DataStorageStrategy, HiveMetastoreCatalog and also HiveSchemaInferenceSuite
were exposed to guava library by directly accessing SessionCatalog's tableRelationCacheg.
These failures occur when guava shading is in place.

## What changes were proposed in this pull request?
This change removes those guava exposures by introducing new methods in SessionCatalog and
also changing DataStorageStrategy, HiveMetastoreCatalog and HiveSchemaInferenceSuite so that
they use those proxy methods.

## How was this patch tested?

Unit tests passed after applying these changes.

Author: Reza Safi <rezasafi@cloudera.com>

Closes #18148 from rezasafi/branch-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1388fdd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1388fdd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1388fdd7

Branch: refs/heads/branch-2.2
Commit: 1388fdd70733b92488f32bb31d2eb2ea4155ee62
Parents: acd4481
Author: Reza Safi <rezasafi@cloudera.com>
Authored: Mon Jun 5 13:19:01 2017 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Mon Jun 5 13:19:01 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 31 +++++++++++++++++---
 .../datasources/DataSourceStrategy.scala        |  4 +--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 16 +++++-----
 .../sql/hive/HiveSchemaInferenceSuite.scala     |  2 +-
 4 files changed, 38 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1388fdd7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index a78440d..57006bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
 
 import java.net.URI
 import java.util.Locale
+import java.util.concurrent.Callable
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
@@ -125,14 +126,36 @@ class SessionCatalog(
     if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
   }
 
-  /**
-   * A cache of qualified table names to table relation plans.
-   */
-  val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
+  private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
     val cacheSize = conf.tableRelationCacheSize
     CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
   }
 
+  /** This method provides a way to get a cached plan. */
+  def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
+    tableRelationCache.get(t, c)
+  }
+
+  /** This method provides a way to get a cached plan if the key exists. */
+  def getCachedTable(key: QualifiedTableName): LogicalPlan = {
+    tableRelationCache.getIfPresent(key)
+  }
+
+  /** This method provides a way to cache a plan. */
+  def cacheTable(t: QualifiedTableName, l: LogicalPlan): Unit = {
+    tableRelationCache.put(t, l)
+  }
+
+  /** This method provides a way to invalidate a cached plan. */
+  def invalidateCachedTable(key: QualifiedTableName): Unit = {
+    tableRelationCache.invalidate(key)
+  }
+
+  /** This method provides a way to invalidate all the cached plans. */
+  def invalidateAllCachedTables(): Unit = {
+    tableRelationCache.invalidateAll()
+  }
+
   /**
    * This method is used to make the given path qualified before we
    * store this path in the underlying external catalog. So, when a path

http://git-wip-us.apache.org/repos/asf/spark/blob/1388fdd7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 21d75a4..e05a8d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -215,9 +215,9 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
   private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
     val table = r.tableMeta
     val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
-    val cache = sparkSession.sessionState.catalog.tableRelationCache
+    val catalogProxy = sparkSession.sessionState.catalog
 
-    val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
+    val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]()
{
       override def call(): LogicalPlan = {
         val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
         val dataSource =

http://git-wip-us.apache.org/repos/asf/spark/blob/1388fdd7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6b98066..9b3cbb6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.types._
 private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
   // these are def_s and not val/lazy val since the latter would introduce circular references
   private def sessionState = sparkSession.sessionState
-  private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
+  private def catalogProxy = sparkSession.sessionState.catalog
   import HiveMetastoreCatalog._
 
   /** These locks guard against multiple attempts to instantiate a table, which wastes memory.
*/
@@ -61,7 +61,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
     val key = QualifiedTableName(
       table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
       table.table.toLowerCase)
-    tableRelationCache.getIfPresent(key)
+    catalogProxy.getCachedTable(key)
   }
 
   private def getCached(
@@ -71,7 +71,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
       expectedFileFormat: Class[_ <: FileFormat],
       partitionSchema: Option[StructType]): Option[LogicalRelation] = {
 
-    tableRelationCache.getIfPresent(tableIdentifier) match {
+    catalogProxy.getCachedTable(tableIdentifier) match {
       case null => None // Cache miss
       case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
         val cachedRelationFileFormatClass = relation.fileFormat.getClass
@@ -92,21 +92,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
               Some(logical)
             } else {
               // If the cached relation is not updated, we invalidate it right away.
-              tableRelationCache.invalidate(tableIdentifier)
+              catalogProxy.invalidateCachedTable(tableIdentifier)
               None
             }
           case _ =>
             logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat.
" +
               s"However, we are getting a ${relation.fileFormat} from the metastore cache.
" +
               "This cached entry will be invalidated.")
-            tableRelationCache.invalidate(tableIdentifier)
+            catalogProxy.invalidateCachedTable(tableIdentifier)
             None
         }
       case other =>
         logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
           s"However, we are getting a $other from the metastore cache. " +
           "This cached entry will be invalidated.")
-        tableRelationCache.invalidate(tableIdentifier)
+        catalogProxy.invalidateCachedTable(tableIdentifier)
         None
     }
   }
@@ -176,7 +176,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
             fileFormat = fileFormat,
             options = options)(sparkSession = sparkSession)
           val created = LogicalRelation(fsRelation, updatedTable)
-          tableRelationCache.put(tableIdentifier, created)
+          catalogProxy.cacheTable(tableIdentifier, created)
           created
         }
 
@@ -205,7 +205,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
                 className = fileType).resolveRelation(),
               table = updatedTable)
 
-          tableRelationCache.put(tableIdentifier, created)
+          catalogProxy.cacheTable(tableIdentifier, created)
           created
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1388fdd7/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
index b3a0604..d271acc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -46,7 +46,7 @@ class HiveSchemaInferenceSuite
 
   override def afterEach(): Unit = {
     super.afterEach()
-    spark.sessionState.catalog.tableRelationCache.invalidateAll()
+    spark.sessionState.catalog.invalidateAllCachedTables()
     FileStatusCache.resetForTesting()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message