spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-18510] Fix data corruption from inferred partition column dataTypes
Date Wed, 23 Nov 2016 19:49:04 GMT
Repository: spark
Updated Branches:
  refs/heads/master f129ebcd3 -> 0d1bf2b6c


[SPARK-18510] Fix data corruption from inferred partition column dataTypes

## What changes were proposed in this pull request?

### The Issue

If I specify my schema when doing
```scala
spark.read
  .schema(someSchemaWherePartitionColumnsAreStrings)
```
but if the partition inference can infer it as IntegerType or I assume LongType or DoubleType
(basically fixed size types), then once UnsafeRows are generated, your data will be corrupted.

### Proposed solution

The partition handling code path is kind of a mess. In my fix I'm probably adding to the mess,
but at least trying to standardize the code path.

The real issue is that a user that uses the `spark.read` code path can never clearly specify
what the partition columns are. If you try to specify the fields in `schema`, we practically
ignore what the user provides, and fall back to our inferred data types. What happens in the
end is data corruption.

My solution tries to fix this by always trying to infer partition columns the first time you
specify the table. Once we find what the partition columns are, we try to find them in the
user specified schema and use the dataType provided there, or fall back to the smallest common
data type.

We will ALWAYS append partition columns to the user's schema, even if they didn't ask for
it. We will only use the data type they provided if they specified it. While this is confusing,
this has been the behavior since Spark 1.6, and I didn't want to change this behavior in the
QA period of Spark 2.1. We may revisit this decision later.

A side effect of this PR is that we won't need https://github.com/apache/spark/pull/15942
if this PR goes in.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15951 from brkyvz/partition-corruption.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d1bf2b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d1bf2b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d1bf2b6

Branch: refs/heads/master
Commit: 0d1bf2b6c8ac4d4141d7cef0552c22e586843c57
Parents: f129ebc
Author: Burak Yavuz <brkyvz@gmail.com>
Authored: Wed Nov 23 11:48:59 2016 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Nov 23 11:48:59 2016 -0800

----------------------------------------------------------------------
 R/pkg/inst/tests/testthat/test_sparkSQL.R       |   2 +-
 .../sql/execution/datasources/DataSource.scala  | 159 ++++++++++++-------
 .../spark/sql/execution/command/DDLSuite.scala  |   2 +-
 .../sql/streaming/FileStreamSourceSuite.scala   |   2 +-
 .../test/DataStreamReaderWriterSuite.scala      |  45 +++++-
 .../sql/test/DataFrameReaderWriterSuite.scala   |  38 ++++-
 6 files changed, 190 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d1bf2b6/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index ee48baa..c669c2e 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2684,7 +2684,7 @@ test_that("Call DataFrameWriter.load() API in Java without path and
check argume
   # It makes sure that we can omit path argument in read.df API and then it calls
   # DataFrameWriter.load() without path.
   expect_error(read.df(source = "json"),
-               paste("Error in loadDF : analysis error - Unable to infer schema for JSON
at .",
+               paste("Error in loadDF : analysis error - Unable to infer schema for JSON.",
                      "It must be specified manually"))
   expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not
exist")
   expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not
exist")

http://git-wip-us.apache.org/repos/asf/spark/blob/0d1bf2b6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 84fde0b..dbc3e71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -61,8 +61,12 @@ import org.apache.spark.util.Utils
  *              qualified. This option only works when reading from a [[FileFormat]].
  * @param userSpecifiedSchema An optional specification of the schema of the data. When present
  *                            we skip attempting to infer the schema.
- * @param partitionColumns A list of column names that the relation is partitioned by. When
this
- *                         list is empty, the relation is unpartitioned.
+ * @param partitionColumns A list of column names that the relation is partitioned by. This
list is
+ *                         generally empty during the read path, unless this DataSource is
managed
+ *                         by Hive. In these cases, during `resolveRelation`, we will call
+ *                         `getOrInferFileFormatSchema` for file based DataSources to infer
the
+ *                         partitioning. In other cases, if this list is empty, then this
table
+ *                         is unpartitioned.
  * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
  * @param catalogTable Optional catalog table reference that can be used to push down operations
  *                     over the datasource to the catalog service.
@@ -84,30 +88,106 @@ case class DataSource(
   private val caseInsensitiveOptions = new CaseInsensitiveMap(options)
 
   /**
-   * Infer the schema of the given FileFormat, returns a pair of schema and partition column
names.
+   * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try
to infer
+   * it. In the read path, only managed tables by Hive provide the partition columns properly
when
+   * initializing this class. All other file based data sources will try to infer the partitioning,
+   * and then cast the inferred types to user specified dataTypes if the partition columns
exist
+   * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510.
+   * This method will try to skip file scanning whether `userSpecifiedSchema` and
+   * `partitionColumns` are provided. Here are some code paths that use this method:
+   *   1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning
columns
+   *   2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them
to the
+   *     dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred
+   *     dataType if they don't.
+   *   3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users
have to
+   *     provide the schema. Here, we also perform partition inference like 2, and try to
use
+   *     dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will
re-use
+   *     this information, therefore calls to this method should be very cheap, i.e. there
won't
+   *     be any further inference in any triggers.
+   *   4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve
the
+   *     existing table's partitioning scheme. This is achieved by not providing
+   *     `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for
an early
+   *     exit, if we don't care about the schema of the original table.
+   *
+   * @param format the file format object for this DataSource
+   * @param justPartitioning Whether to exit early and provide just the schema partitioning.
+   * @return A pair of the data schema (excluding partition columns) and the schema of the
partition
+   *         columns. If `justPartitioning` is `true`, then the dataSchema will be provided
as
+   *         `null`.
    */
-  private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = {
-    userSpecifiedSchema.map(_ -> partitionColumns).orElse {
-      val allPaths = caseInsensitiveOptions.get("path")
+  private def getOrInferFileFormatSchema(
+      format: FileFormat,
+      justPartitioning: Boolean = false): (StructType, StructType) = {
+    // the operations below are expensive therefore try not to do them if we don't need to
+    lazy val tempFileCatalog = {
+      val allPaths = caseInsensitiveOptions.get("path") ++ paths
+      val hadoopConf = sparkSession.sessionState.newHadoopConf()
       val globbedPaths = allPaths.toSeq.flatMap { path =>
         val hdfsPath = new Path(path)
-        val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+        val fs = hdfsPath.getFileSystem(hadoopConf)
         val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
         SparkHadoopUtil.get.globPathIfNecessary(qualified)
       }.toArray
-      val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
-      val partitionSchema = fileCatalog.partitionSpec().partitionColumns
-      val inferred = format.inferSchema(
+      new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
+    }
+    val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
+      // Try to infer partitioning, because no DataSource in the read path provides the partitioning
+      // columns properly unless it is a Hive DataSource
+      val resolved = tempFileCatalog.partitionSchema.map { partitionField =>
+        val equality = sparkSession.sessionState.conf.resolver
+        // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to
inferred
+        userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
+          partitionField)
+      }
+      StructType(resolved)
+    } else {
+      // in streaming mode, we have already inferred and registered partition columns, we
will
+      // never have to materialize the lazy val below
+      lazy val inferredPartitions = tempFileCatalog.partitionSchema
+      // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
+      // partitioning
+      if (userSpecifiedSchema.isEmpty) {
+        inferredPartitions
+      } else {
+        val partitionFields = partitionColumns.map { partitionColumn =>
+          userSpecifiedSchema.flatMap(_.find(_.name == partitionColumn)).orElse {
+            val inferredOpt = inferredPartitions.find(_.name == partitionColumn)
+            if (inferredOpt.isDefined) {
+              logDebug(
+                s"""Type of partition column: $partitionColumn not found in specified schema
+                   |for $format.
+                   |User Specified Schema
+                   |=====================
+                   |${userSpecifiedSchema.orNull}
+                   |
+                   |Falling back to inferred dataType if it exists.
+                 """.stripMargin)
+            }
+            inferredPartitions.find(_.name == partitionColumn)
+          }.getOrElse {
+            throw new AnalysisException(s"Failed to resolve the schema for $format for "
+
+              s"the partition column: $partitionColumn. It must be specified manually.")
+          }
+        }
+        StructType(partitionFields)
+      }
+    }
+    if (justPartitioning) {
+      return (null, partitionSchema)
+    }
+    val dataSchema = userSpecifiedSchema.map { schema =>
+      val equality = sparkSession.sessionState.conf.resolver
+      StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name,
f.name))))
+    }.orElse {
+      format.inferSchema(
         sparkSession,
         caseInsensitiveOptions,
-        fileCatalog.allFiles())
-
-      inferred.map { inferredSchema =>
-        StructType(inferredSchema ++ partitionSchema) -> partitionSchema.map(_.name)
-      }
+        tempFileCatalog.allFiles())
     }.getOrElse {
-      throw new AnalysisException("Unable to infer schema. It must be specified manually.")
+      throw new AnalysisException(
+        s"Unable to infer schema for $format. It must be specified manually.")
     }
+    (dataSchema, partitionSchema)
   }
 
   /** Returns the name and schema of the source that can be used to continually read data.
*/
@@ -144,8 +224,8 @@ case class DataSource(
               "you may be able to create a static DataFrame on that directory with " +
               "'spark.read.load(directory)' and infer schema from it.")
         }
-        val (schema, partCols) = inferFileFormatSchema(format)
-        SourceInfo(s"FileSource[$path]", schema, partCols)
+        val (schema, partCols) = getOrInferFileFormatSchema(format)
+        SourceInfo(s"FileSource[$path]", StructType(schema ++ partCols), partCols.fieldNames)
 
       case _ =>
         throw new UnsupportedOperationException(
@@ -272,7 +352,7 @@ case class DataSource(
 
         HadoopFsRelation(
           fileCatalog,
-          partitionSchema = fileCatalog.partitionSpec().partitionColumns,
+          partitionSchema = fileCatalog.partitionSchema,
           dataSchema = dataSchema,
           bucketSpec = None,
           format,
@@ -281,9 +361,10 @@ case class DataSource(
       // This is a non-streaming file based datasource.
       case (format: FileFormat, _) =>
         val allPaths = caseInsensitiveOptions.get("path") ++ paths
+        val hadoopConf = sparkSession.sessionState.newHadoopConf()
         val globbedPaths = allPaths.flatMap { path =>
           val hdfsPath = new Path(path)
-          val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+          val fs = hdfsPath.getFileSystem(hadoopConf)
           val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
           val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
 
@@ -291,23 +372,14 @@ case class DataSource(
             throw new AnalysisException(s"Path does not exist: $qualified")
           }
           // Sufficient to check head of the globPath seq for non-glob scenario
+          // Don't need to check once again if files exist in streaming mode
           if (checkFilesExist && !fs.exists(globPath.head)) {
             throw new AnalysisException(s"Path does not exist: ${globPath.head}")
           }
           globPath
         }.toArray
 
-        // If they gave a schema, then we try and figure out the types of the partition columns
-        // from that schema.
-        val partitionSchema = userSpecifiedSchema.map { schema =>
-          StructType(
-            partitionColumns.map { c =>
-              // TODO: Case sensitivity.
-              schema
-                  .find(_.name.toLowerCase() == c.toLowerCase())
-                  .getOrElse(throw new AnalysisException(s"Invalid partition column '$c'"))
-            })
-        }
+        val (dataSchema, inferredPartitionSchema) = getOrInferFileFormatSchema(format)
 
         val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
             catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog)
{
@@ -316,27 +388,12 @@ case class DataSource(
             catalogTable.get,
             catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
         } else {
-          new InMemoryFileIndex(
-            sparkSession, globbedPaths, options, partitionSchema)
-        }
-
-        val dataSchema = userSpecifiedSchema.map { schema =>
-          val equality = sparkSession.sessionState.conf.resolver
-          StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
-        }.orElse {
-          format.inferSchema(
-            sparkSession,
-            caseInsensitiveOptions,
-            fileCatalog.asInstanceOf[InMemoryFileIndex].allFiles())
-        }.getOrElse {
-          throw new AnalysisException(
-            s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. "
+
-              "It must be specified manually")
+          new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(inferredPartitionSchema))
         }
 
         HadoopFsRelation(
           fileCatalog,
-          partitionSchema = fileCatalog.partitionSchema,
+          partitionSchema = inferredPartitionSchema,
           dataSchema = dataSchema.asNullable,
           bucketSpec = bucketSpec,
           format,
@@ -384,11 +441,7 @@ case class DataSource(
         // up.  If we fail to load the table for whatever reason, ignore the check.
         if (mode == SaveMode.Append) {
           val existingPartitionColumns = Try {
-            resolveRelation()
-              .asInstanceOf[HadoopFsRelation]
-              .partitionSchema
-              .fieldNames
-              .toSeq
+            getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
           }.getOrElse(Seq.empty[String])
           // TODO: Case sensitivity.
           val sameColumns =

http://git-wip-us.apache.org/repos/asf/spark/blob/0d1bf2b6/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 02d9d15..10843e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -274,7 +274,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
           pathToPartitionedTable,
           userSpecifiedSchema = Option("num int, str string"),
           userSpecifiedPartitionCols = partitionCols,
-          expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+          expectedSchema = new StructType().add("str", StringType).add("num", IntegerType),
           expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d1bf2b6/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index a099153..bad6642 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -282,7 +282,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
           createFileStreamSourceAndGetSchema(
             format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
         }
-        assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
+        assert("Unable to infer schema for JSON. It must be specified manually.;" === e.getMessage)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d1bf2b6/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 5630464..0eb95a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest}
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 object LastOptions {
@@ -532,4 +532,47 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter
{
     assert(e.getMessage.contains("does not support recovering"))
     assert(e.getMessage.contains("checkpoint location"))
   }
+
+  test("SPARK-18510: use user specified types for partition columns in file sources") {
+    import org.apache.spark.sql.functions.udf
+    import testImplicits._
+    withTempDir { src =>
+      val createArray = udf { (length: Long) =>
+        for (i <- 1 to length.toInt) yield i.toString
+      }
+      spark.range(4).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as 'part).coalesce(1).write
+        .partitionBy("part", "id")
+        .mode("overwrite")
+        .parquet(src.toString)
+      // Specify a random ordering of the schema, partition column in the middle, etc.
+      // Also let's say that the partition columns are Strings instead of Longs.
+      // partition columns should go to the end
+      val schema = new StructType()
+        .add("id", StringType)
+        .add("ex", ArrayType(StringType))
+
+      val sdf = spark.readStream
+        .schema(schema)
+        .format("parquet")
+        .load(src.toString)
+
+      assert(sdf.schema.toList === List(
+        StructField("ex", ArrayType(StringType)),
+        StructField("part", IntegerType), // inferred partitionColumn dataType
+        StructField("id", StringType))) // used user provided partitionColumn dataType
+
+      val sq = sdf.writeStream
+        .queryName("corruption_test")
+        .format("memory")
+        .start()
+      sq.processAllAvailable()
+      checkAnswer(
+        spark.table("corruption_test"),
+        // notice how `part` is ordered before `id`
+        Row(Array("1"), 0, "0") :: Row(Array("1", "2"), 1, "1") ::
+          Row(Array("1", "2", "3"), 2, "2") :: Row(Array("1", "2", "3", "4"), 3, "3") ::
Nil
+      )
+      sq.stop()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d1bf2b6/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index a7fda01..e0887e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 
@@ -573,4 +573,40 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext
with Be
       }
     }
   }
+
+  test("SPARK-18510: use user specified types for partition columns in file sources") {
+    import org.apache.spark.sql.functions.udf
+    import testImplicits._
+    withTempDir { src =>
+      val createArray = udf { (length: Long) =>
+        for (i <- 1 to length.toInt) yield i.toString
+      }
+      spark.range(4).select(createArray('id + 1) as 'ex, 'id, 'id % 4 as 'part).coalesce(1).write
+        .partitionBy("part", "id")
+        .mode("overwrite")
+        .parquet(src.toString)
+      // Specify a random ordering of the schema, partition column in the middle, etc.
+      // Also let's say that the partition columns are Strings instead of Longs.
+      // partition columns should go to the end
+      val schema = new StructType()
+        .add("id", StringType)
+        .add("ex", ArrayType(StringType))
+      val df = spark.read
+        .schema(schema)
+        .format("parquet")
+        .load(src.toString)
+
+      assert(df.schema.toList === List(
+        StructField("ex", ArrayType(StringType)),
+        StructField("part", IntegerType), // inferred partitionColumn dataType
+        StructField("id", StringType))) // used user provided partitionColumn dataType
+
+      checkAnswer(
+        df,
+        // notice how `part` is ordered before `id`
+        Row(Array("1"), 0, "0") :: Row(Array("1", "2"), 1, "1") ::
+          Row(Array("1", "2", "3"), 2, "2") :: Row(Array("1", "2", "3", "4"), 3, "3") ::
Nil
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message