spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-10540] Fixes flaky all-data-type test
Date Fri, 18 Sep 2015 19:19:28 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 2c6a51e14 -> e1e781f04


[SPARK-10540] Fixes flaky all-data-type test

This PR breaks the original test case into multiple ones (one test case for each data type).
In this way, test failure output can be much more readable.

Within each test case, we build a table with two columns, one of them is for the data type
to test, the other is an "index" column, which is used to sort the DataFrame and workaround
[SPARK-10591] [1]

[1]: https://issues.apache.org/jira/browse/SPARK-10591

Author: Cheng Lian <lian@databricks.com>

Closes #8768 from liancheng/spark-10540/test-all-data-types.

(cherry picked from commit 00a2911c5bea67a1a4796fb1d6fd5d0a8ee79001)
Signed-off-by: Yin Huai <yhuai@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1e781f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1e781f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1e781f0

Branch: refs/heads/branch-1.5
Commit: e1e781f04963a69f9b2d0be664ed2457016d94d2
Parents: 2c6a51e
Author: Cheng Lian <lian@databricks.com>
Authored: Fri Sep 18 12:19:08 2015 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Fri Sep 18 12:19:23 2015 -0700

----------------------------------------------------------------------
 .../sql/sources/hadoopFsRelationSuites.scala    | 109 ++++++++-----------
 1 file changed, 43 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1e781f0/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 0f1cddd..14efe40 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -102,80 +102,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils
{
     }
   }
 
-  ignore("test all data types") {
-    withTempPath { file =>
-      // Create the schema.
-      val struct =
-        StructType(
-          StructField("f1", FloatType, true) ::
-            StructField("f2", ArrayType(BooleanType), true) :: Nil)
-      // TODO: add CalendarIntervalType to here once we can save it out.
-      val dataTypes =
-        Seq(
-          StringType, BinaryType, NullType, BooleanType,
-          ByteType, ShortType, IntegerType, LongType,
-          FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
-          DateType, TimestampType,
-          ArrayType(IntegerType), MapType(StringType, LongType), struct,
-          new MyDenseVectorUDT())
-      val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
-        StructField(s"col$index", dataType, nullable = true)
-      }
-      val schema = StructType(fields)
-
-      // Generate data at the driver side. We need to materialize the data first and then
-      // create RDD.
-      val maybeDataGenerator =
-        RandomDataGenerator.forType(
-          dataType = schema,
+  private val supportedDataTypes = Seq(
+    StringType, BinaryType,
+    NullType, BooleanType,
+    ByteType, ShortType, IntegerType, LongType,
+    FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+    DateType, TimestampType,
+    ArrayType(IntegerType),
+    MapType(StringType, LongType),
+    new StructType()
+      .add("f1", FloatType, nullable = true)
+      .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true),
+    new MyDenseVectorUDT()
+  ).filter(supportsDataType)
+
+  for (dataType <- supportedDataTypes) {
+    test(s"test all data types - $dataType") {
+      withTempPath { file =>
+        val path = file.getCanonicalPath
+
+        val dataGenerator = RandomDataGenerator.forType(
+          dataType = dataType,
           nullable = true,
-          seed = Some(System.nanoTime()))
-      val dataGenerator =
-        maybeDataGenerator
-          .getOrElse(fail(s"Failed to create data generator for schema $schema"))
-      val data = (1 to 10).map { i =>
-        dataGenerator.apply() match {
-          case row: Row => row
-          case null => Row.fromSeq(Seq.fill(schema.length)(null))
-          case other =>
-            fail(s"Row or null is expected to be generated, " +
-              s"but a ${other.getClass.getCanonicalName} is generated.")
+          seed = Some(System.nanoTime())
+        ).getOrElse {
+          fail(s"Failed to create data generator for schema $dataType")
         }
-      }
 
-      // Create a DF for the schema with random data.
-      val rdd = sqlContext.sparkContext.parallelize(data, 10)
-      val df = sqlContext.createDataFrame(rdd, schema)
+        // Create a DF for the schema with random data. The index field is used to sort the
+        // DataFrame.  This is a workaround for SPARK-10591.
+        val schema = new StructType()
+          .add("index", IntegerType, nullable = false)
+          .add("col", dataType, nullable = true)
+        val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+        val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
 
-      // All columns that have supported data types of this source.
-      val supportedColumns = schema.fields.collect {
-        case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
-      }
-      val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
-
-      val dfToBeSaved = df.selectExpr(selectedColumns: _*)
-
-      // Save the data out.
-      dfToBeSaved
-        .write
-        .format(dataSourceName)
-        .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
-        .save(file.getCanonicalPath)
+        df.write
+          .mode("overwrite")
+          .format(dataSourceName)
+          .option("dataSchema", df.schema.json)
+          .save(path)
 
-      val loadedDF =
-        sqlContext
+        val loadedDF = sqlContext
           .read
           .format(dataSourceName)
-          .schema(dfToBeSaved.schema)
-          .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
-          .load(file.getCanonicalPath)
-          .selectExpr(selectedColumns: _*)
+          .option("dataSchema", df.schema.json)
+          .schema(df.schema)
+          .load(path)
+          .orderBy("index")
 
-      // Read the data back.
-      checkAnswer(
-        loadedDF,
-        dfToBeSaved
-      )
+        checkAnswer(loadedDF, df)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message