spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] steveloughran commented on a change in pull request #25863: [SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate result and support concurrent file source write operations write to different partitions in the same table.
Date Mon, 30 Sep 2019 13:34:22 GMT
steveloughran commented on a change in pull request #25863: [SPARK-28945][SPARK-29037][CORE][SQL]
Fix the issue that spark gives duplicate result and support concurrent file source write operations
write to different partitions in the same table.
URL: https://github.com/apache/spark/pull/25863#discussion_r329578757
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##########
 @@ -272,3 +351,60 @@ class HadoopMapReduceCommitProtocol(
     }
   }
 }
+
+object  HadoopMapReduceCommitProtocol extends Logging {
+
+  /**
+   * Get a path according to specified partition key-value pairs.
+   */
+  def getEscapedStaticPartitionPath(staticPartitionKVs: Iterable[(String, String)]): String
= {
+    staticPartitionKVs.map{kv =>
+      kv._1 + "=" + kv._2
+    }.mkString(File.separator)
+  }
+
+  /**
+   * Delete the staging output path of current InsertIntoHadoopFsRelation operation. This
output
+   * path is used to mark a InsertIntoHadoopFsRelation operation and we can detect conflict
when
+   * there are several operations write same partition or a non-partitioned table concurrently.
+   *
+   * The output path is a multi level path and is composed of specified partition key value
pairs
+   * formatted `.spark-staging-${depth}/p1=v1/p2=v2/.../pn=vn/appId/jobId`. When deleting
the
+   * staging output path, delete the last level with recursive firstly. Then try to delete
upper
+   * level without recursive, if success, then delete upper level with same way, until delete
the
+   * insertStagingDir.
+   */
+   def deleteStagingInsertOutputPath(
+       fs: FileSystem,
+       insertStagingDir: Path,
+       stagingOutputDir: Path,
+       escapedStaticPartitionKVs: Seq[(String, String)]): Unit = {
+     if (insertStagingDir == null || stagingOutputDir ==null || !fs.exists(stagingOutputDir)
||
 
 Review comment:
   call fs.getFileStatus and check the status for the directory state,otherwise you are doing
two wrapped getFileStatus calls back to back

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message