spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-25738][SQL] Fix LOAD DATA INPATH for hdfs port
Date Tue, 16 Oct 2018 01:34:54 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 b6e4aca0b -> d64b35588


[SPARK-25738][SQL] Fix LOAD DATA INPATH for hdfs port

## What changes were proposed in this pull request?

LOAD DATA INPATH didn't work if the defaultFS included a port for hdfs.
Handling this just requires a small change to use the correct URI
constructor.

## How was this patch tested?

Added a unit test, ran all tests via jenkins

Closes #22733 from squito/SPARK-25738.

Authored-by: Imran Rashid <irashid@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit fdaa99897ac8755938d031896ae0eefb46ce7107)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d64b3558
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d64b3558
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d64b3558

Branch: refs/heads/branch-2.4
Commit: d64b355886b2dd3486729d51bac1a17abba3f64a
Parents: b6e4aca
Author: Imran Rashid <irashid@cloudera.com>
Authored: Mon Oct 15 18:34:30 2018 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Mon Oct 15 18:34:47 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/command/tables.scala  | 11 +++++++----
 .../apache/spark/sql/hive/execution/SQLQuerySuite.scala  |  8 ++++++++
 2 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d64b3558/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 2eca1c4..64831e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -306,7 +306,8 @@ case class LoadDataCommand(
     val loadPath = {
       if (isLocal) {
         val localFS = FileContext.getLocalFSFileContext()
-        makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path))
+        LoadDataCommand.makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(),
+          new Path(path))
       } else {
         val loadPath = new Path(path)
         // Follow Hive's behavior:
@@ -323,7 +324,7 @@ case class LoadDataCommand(
         // by considering the wild card scenario in mind.as per old logic query param  is
         // been considered while creating URI instance and if path contains wild card char
'?'
         // the remaining charecters after '?' will be removed while forming URI instance
-        makeQualified(defaultFS, uriPath, loadPath)
+        LoadDataCommand.makeQualified(defaultFS, uriPath, loadPath)
       }
     }
     val fs = loadPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
@@ -363,7 +364,9 @@ case class LoadDataCommand(
     CommandUtils.updateTableStats(sparkSession, targetTable)
     Seq.empty[Row]
   }
+}
 
+object LoadDataCommand {
   /**
    * Returns a qualified path object. Method ported from org.apache.hadoop.fs.Path class.
    *
@@ -372,7 +375,7 @@ case class LoadDataCommand(
    * @param path       Path instance based on the path string specified by the user.
    * @return qualified path object
    */
-  private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = {
+  private[sql] def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = {
     val pathUri = if (path.isAbsolute()) path.toUri() else new Path(workingDir, path).toUri()
     if (pathUri.getScheme == null || pathUri.getAuthority == null &&
         defaultUri.getAuthority != null) {
@@ -383,7 +386,7 @@ case class LoadDataCommand(
         pathUri.getAuthority
       }
       try {
-        val newUri = new URI(scheme, authority, pathUri.getPath, pathUri.getFragment)
+        val newUri = new URI(scheme, authority, pathUri.getPath, null, pathUri.getFragment)
         new Path(newUri)
       } catch {
         case e: URISyntaxException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d64b3558/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e49aea2..dfcde8c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive.execution
 
 import java.io.File
+import java.net.URI
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 import java.util.{Locale, Set}
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
Functio
 import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.command.LoadDataCommand
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
@@ -1985,6 +1987,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
     }
   }
 
+  test("SPARK-25738: defaultFs can have a port") {
+    val defaultURI = new URI("hdfs://fizz.buzz.com:8020")
+    val r = LoadDataCommand.makeQualified(defaultURI, new Path("/foo/bar"), new Path("/flim/flam"))
+    assert(r === new Path("hdfs://fizz.buzz.com:8020/flim/flam"))
+  }
+
   test("Insert overwrite with partition") {
     withTable("tableWithPartition") {
       sql(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message