From commits-return-44405-archive-asf-public=cust-asf.ponee.io@spark.apache.org Tue Jun 30 08:20:48 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 44879180643 for ; Tue, 30 Jun 2020 10:20:48 +0200 (CEST) Received: (qmail 26314 invoked by uid 500); 30 Jun 2020 08:20:47 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 26305 invoked by uid 99); 30 Jun 2020 08:20:47 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jun 2020 08:20:47 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 81EAE890B8; Tue, 30 Jun 2020 08:20:47 +0000 (UTC) Date: Tue, 30 Jun 2020 08:20:41 +0000 To: "commits@spark.apache.org" Subject: [spark] branch branch-3.0 updated: [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <159350523513.21505.9155877210255867114@gitbox.apache.org> From: wenchen@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: spark X-Git-Refname: refs/heads/branch-3.0 X-Git-Reftype: branch X-Git-Oldrev: 503e56aec53c73d7673a79d2e150ac288e44ce7e X-Git-Newrev: 22e34336da50220073d83768903726e619489942 X-Git-Rev: 22e34336da50220073d83768903726e619489942 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 22e3433 [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory 22e3433 is described below commit 22e34336da50220073d83768903726e619489942 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Tue Jun 30 08:09:18 2020 +0000 [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory ### What changes were proposed in this pull request? This patch fixes the missed spot - the test initializes FileStreamSinkLog with its "output" directory instead of "metadata" directory, hence the verification against sink log was no-op. ### Why are the changes needed? Without the fix, the verification against sink log was no-op. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Checked with debugger in test, and verified `allFiles()` returns non-zero entries. (It returned zero entry, as there's no metadata.) Closes #28930 from HeartSaVioR/SPARK-29999-FOLLOWUP-fix-test. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Wenchen Fan (cherry picked from commit 5472170a2b35864c617bdb846ff7123533765a16) Signed-off-by: Wenchen Fan --- .../sql/execution/streaming/FileStreamSink.scala | 19 +++++++++++-------- .../spark/sql/streaming/FileStreamSinkSuite.scala | 10 ++++++---- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index b679f16..86a3194 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -45,8 +45,7 @@ object FileStreamSink extends Logging { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) if (fs.isDirectory(hdfsPath)) { - val metadataPath = new Path(hdfsPath, metadataDir) - checkEscapedMetadataPath(fs, metadataPath, sqlConf) + val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf) fs.exists(metadataPath) } else { false @@ -55,6 +54,12 @@ object FileStreamSink extends Logging { } } + def getMetadataLogPath(fs: FileSystem, path: Path, sqlConf: SQLConf): Path = { + val metadataDir = new Path(path, FileStreamSink.metadataDir) + FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf) + metadataDir + } + def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = { if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) && StreamExecution.containsSpecialCharsInPath(metadataPath)) { @@ -125,14 +130,12 @@ class FileStreamSink( partitionColumnNames: Seq[String], options: Map[String, String]) extends Sink with Logging { + import FileStreamSink._ + private val hadoopConf = sparkSession.sessionState.newHadoopConf() private val basePath = new Path(path) - private val logPath = { - val metadataDir = new Path(basePath, FileStreamSink.metadataDir) - val fs = metadataDir.getFileSystem(hadoopConf) - FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf) - metadataDir - } + private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath, + sparkSession.sessionState.conf) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 8779651..aa2664c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -555,10 +555,12 @@ abstract class FileStreamSinkSuite extends StreamTest { } } - val fs = new Path(outputDir.getCanonicalPath).getFileSystem( - spark.sessionState.newHadoopConf()) - val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, - outputDir.getCanonicalPath) + val outputDirPath = new Path(outputDir.getCanonicalPath) + val hadoopConf = spark.sessionState.newHadoopConf() + val fs = outputDirPath.getFileSystem(hadoopConf) + val logPath = FileStreamSink.getMetadataLogPath(fs, outputDirPath, conf) + + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString) val allFiles = sinkLog.allFiles() // only files from non-empty partition should be logged --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org