spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SQL] Minor readability improvement for partition handling code
Date Thu, 22 Dec 2016 07:45:26 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 07e2a17d1 -> def3690f6


[SQL] Minor readability improvement for partition handling code

This patch includes minor changes to improve readability for partition handling code. I'm
in the middle of implementing some new feature and found some naming / implicit type inference
not as intuitive.

This patch should have no semantic change and the changes should be covered by existing test
cases.

Author: Reynold Xin <rxin@databricks.com>

Closes #16378 from rxin/minor-fix.

(cherry picked from commit 7c5b7b3a2e5a7c1b2d0d8ce655840cad581e47ac)
Signed-off-by: Reynold Xin <rxin@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/def3690f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/def3690f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/def3690f

Branch: refs/heads/branch-2.1
Commit: def3690f6889979226478bf9c35a240d7e0662e6
Parents: 07e2a17
Author: Reynold Xin <rxin@databricks.com>
Authored: Thu Dec 22 15:29:56 2016 +0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed Dec 21 23:45:16 2016 -0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      |  7 +-
 .../datasources/CatalogFileIndex.scala          | 11 +--
 .../sql/execution/datasources/FileFormat.scala  |  3 +-
 .../execution/datasources/FileStatusCache.scala | 72 ++++++++++----------
 4 files changed, 49 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/def3690f/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index e485b52..7616164 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -136,7 +136,7 @@ case class RowDataSourceScanExec(
  * @param outputSchema Output schema of the scan.
  * @param partitionFilters Predicates to use for partition pruning.
  * @param dataFilters Data source filters to use for filtering data within partitions.
- * @param metastoreTableIdentifier
+ * @param metastoreTableIdentifier identifier for the table in the metastore.
  */
 case class FileSourceScanExec(
     @transient relation: HadoopFsRelation,
@@ -147,10 +147,10 @@ case class FileSourceScanExec(
     override val metastoreTableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec {
 
-  val supportsBatch = relation.fileFormat.supportBatch(
+  val supportsBatch: Boolean = relation.fileFormat.supportBatch(
     relation.sparkSession, StructType.fromAttributes(output))
 
-  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+  val needsUnsafeRowConversion: Boolean = if (relation.fileFormat.isInstanceOf[ParquetSource])
{
     SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
   } else {
     false
@@ -516,7 +516,6 @@ case class FileSourceScanExec(
     }
 
     // Assign files to partitions using "First Fit Decreasing" (FFD)
-    // TODO: consider adding a slop factor here?
     splitFiles.foreach { file =>
       if (currentSize + file.length > maxSplitBytes) {
         closePartition()

http://git-wip-us.apache.org/repos/asf/spark/blob/def3690f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 4ad91dc..1235a4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.SparkSession
@@ -37,14 +38,15 @@ class CatalogFileIndex(
     val table: CatalogTable,
     override val sizeInBytes: Long) extends FileIndex {
 
-  protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+  protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf()
 
-  private val fileStatusCache = FileStatusCache.newCache(sparkSession)
+  /** Globally shared (not exclusive to this table) cache for file statuses to speed up listing.
*/
+  private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
 
   assert(table.identifier.database.isDefined,
     "The table identifier must be qualified in CatalogFileIndex")
 
-  private val baseLocation = table.storage.locationUri
+  private val baseLocation: Option[String] = table.storage.locationUri
 
   override def partitionSchema: StructType = table.partitionSchema
 
@@ -76,7 +78,8 @@ class CatalogFileIndex(
       new PrunedInMemoryFileIndex(
         sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
     } else {
-      new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None)
+      new InMemoryFileIndex(
+        sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/def3690f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 4f4aaaa..6784ee2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -148,7 +148,8 @@ trait FileFormat {
  * The base class file format that is based on text file.
  */
 abstract class TextBasedFileFormat extends FileFormat {
-  private var codecFactory: CompressionCodecFactory = null
+  private var codecFactory: CompressionCodecFactory = _
+
   override def isSplitable(
       sparkSession: SparkSession,
       options: Map[String, String],

http://git-wip-us.apache.org/repos/asf/spark/blob/def3690f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index 7c2e6fd..5d97558 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
@@ -26,9 +25,38 @@ import com.google.common.cache._
 import org.apache.hadoop.fs.{FileStatus, Path}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+import org.apache.spark.util.SizeEstimator
+
+
+/**
+ * Use [[FileStatusCache.getOrCreate()]] to construct a globally shared file status cache.
+ */
+object FileStatusCache {
+  private var sharedCache: SharedInMemoryCache = _
+
+  /**
+   * @return a new FileStatusCache based on session configuration. Cache memory quota is
+   *         shared across all clients.
+   */
+  def getOrCreate(session: SparkSession): FileStatusCache = synchronized {
+    if (session.sqlContext.conf.manageFilesourcePartitions &&
+      session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
+      if (sharedCache == null) {
+        sharedCache = new SharedInMemoryCache(
+          session.sqlContext.conf.filesourcePartitionFileCacheSize)
+      }
+      sharedCache.createForNewClient()
+    } else {
+      NoopCache
+    }
+  }
+
+  def resetForTesting(): Unit = synchronized {
+    sharedCache = null
+  }
+}
+
 
 /**
  * A cache of the leaf files of partition directories. We cache these files in order to speed
@@ -55,32 +83,6 @@ abstract class FileStatusCache {
   def invalidateAll(): Unit
 }
 
-object FileStatusCache {
-  private var sharedCache: SharedInMemoryCache = null
-
-  /**
-   * @return a new FileStatusCache based on session configuration. Cache memory quota is
-   *         shared across all clients.
-   */
-  def newCache(session: SparkSession): FileStatusCache = {
-    synchronized {
-      if (session.sqlContext.conf.manageFilesourcePartitions &&
-          session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
-        if (sharedCache == null) {
-          sharedCache = new SharedInMemoryCache(
-            session.sqlContext.conf.filesourcePartitionFileCacheSize)
-        }
-        sharedCache.getForNewClient()
-      } else {
-        NoopCache
-      }
-    }
-  }
-
-  def resetForTesting(): Unit = synchronized {
-    sharedCache = null
-  }
-}
 
 /**
  * An implementation that caches partition file statuses in memory.
@@ -88,7 +90,6 @@ object FileStatusCache {
  * @param maxSizeInBytes max allowable cache size before entries start getting evicted
  */
 private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
-  import FileStatusCache._
 
   // Opaque object that uniquely identifies a shared cache user
   private type ClientId = Object
@@ -102,8 +103,9 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging
{
         (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
       }})
     .removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
-      override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
= {
-        if (removed.getCause() == RemovalCause.SIZE &&
+      override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
+        : Unit = {
+        if (removed.getCause == RemovalCause.SIZE &&
             warnedAboutEviction.compareAndSet(false, true)) {
           logWarning(
             "Evicting cached table partition metadata from memory due to size constraints
" +
@@ -112,13 +114,13 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging
{
         }
       }})
     .maximumWeight(maxSizeInBytes)
-    .build()
+    .build[(ClientId, Path), Array[FileStatus]]()
 
   /**
    * @return a FileStatusCache that does not share any entries with any other client, but
does
    *         share memory resources for the purpose of cache eviction.
    */
-  def getForNewClient(): FileStatusCache = new FileStatusCache {
+  def createForNewClient(): FileStatusCache = new FileStatusCache {
     val clientId = new Object()
 
     override def getLeafFiles(path: Path): Option[Array[FileStatus]] = {
@@ -126,7 +128,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging
{
     }
 
     override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = {
-      cache.put((clientId, path), leafFiles.toArray)
+      cache.put((clientId, path), leafFiles)
     }
 
     override def invalidateAll(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message