spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [2/2] git commit: [SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL
Date Wed, 18 Jun 2014 02:15:29 GMT
[SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL

JIRA: https://issues.apache.org/jira/browse/SPARK-2060

Programming guide: http://yhuai.github.io/site/sql-programming-guide.html

Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #999 from yhuai/newJson and squashes the following commits:

227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ce8eedd [Yin Huai] rxin's comments.
bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
94ffdaa [Yin Huai] Remove "get" from method names.
ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
79ea9ba [Yin Huai] Fix typos.
5428451 [Yin Huai] Newline
1f908ce [Yin Huai] Remove extra line.
d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
7ea750e [Yin Huai] marmbrus's comments.
6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
83013fb [Yin Huai] Update Java Example.
e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map.
6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4fbddf0 [Yin Huai] Programming guide.
9df8c5a [Yin Huai] Python API.
7027634 [Yin Huai] Java API.
cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset.
d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ab810b0 [Yin Huai] Make JsonRDD private.
6df0891 [Yin Huai] Apache header.
8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema.
8ffed79 [Yin Huai] Update the example.
a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution.
65b87f0 [Yin Huai] Fix sampling...
8846af5 [Yin Huai] API doc.
52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
0387523 [Yin Huai] Address PR comments.
666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
a2313a6 [Yin Huai] Address PR comments.
f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used.
0576406 [Yin Huai] Add Apache license header.
af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD.
f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema.

(cherry picked from commit d2f4f30b12f99358953e2781957468e2cfe3c916)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1e22b38
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1e22b38
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1e22b38

Branch: refs/heads/branch-1.0
Commit: d1e22b386839e6f81cfd83b1903b9dc8c4bbef64
Parents: ac6c10e
Author: Yin Huai <huai@cse.ohio-state.edu>
Authored: Tue Jun 17 19:14:59 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Tue Jun 17 19:15:09 2014 -0700

----------------------------------------------------------------------
 .rat-excludes                                   |   1 +
 docs/sql-programming-guide.md                   | 290 ++++++++---
 .../apache/spark/examples/sql/JavaSparkSQL.java |  78 ++-
 examples/src/main/resources/people.json         |   3 +
 project/SparkBuild.scala                        |  22 +-
 python/pyspark/sql.py                           |  64 ++-
 sql/catalyst/pom.xml                            |  28 +
 .../catalyst/analysis/HiveTypeCoercion.scala    |  25 +-
 .../spark/sql/catalyst/plans/QueryPlan.scala    |  51 ++
 .../optimizer/CombiningLimitsSuite.scala        |   3 +-
 .../optimizer/ConstantFoldingSuite.scala        |   3 +-
 .../optimizer/FilterPushdownSuite.scala         |   5 +-
 .../sql/catalyst/optimizer/OptimizerTest.scala  |  55 --
 ...SimplifyCaseConversionExpressionsSuite.scala |   3 +-
 .../spark/sql/catalyst/plans/PlanTest.scala     |  54 ++
 sql/core/pom.xml                                |  12 +
 .../scala/org/apache/spark/sql/SQLContext.scala |  45 +-
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |  38 +-
 .../org/apache/spark/sql/SchemaRDDLike.scala    |   6 +
 .../spark/sql/api/java/JavaSQLContext.scala     |  20 +
 .../org/apache/spark/sql/json/JsonRDD.scala     | 397 ++++++++++++++
 .../scala/org/apache/spark/sql/QueryTest.scala  |   4 +-
 .../spark/sql/api/java/JavaSQLSuite.scala       |  45 ++
 .../org/apache/spark/sql/json/JsonSuite.scala   | 519 +++++++++++++++++++
 .../apache/spark/sql/json/TestJsonData.scala    |  84 +++
 25 files changed, 1694 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 1558970..4c5d560 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -21,6 +21,7 @@ spark-env.sh.template
 log4j-defaults.properties
 sorttable.js
 .*txt
+.*json
 .*data
 .*log
 cloudpickle.py

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 4623bb4..522c838 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -17,20 +17,20 @@ Spark.  At the core of this component is a new type of RDD,
 [Row](api/scala/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects along with
 a schema that describes the data types of each column in the row.  A SchemaRDD is similar to a table
 in a traditional relational database.  A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io)
-file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
+file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
 
 All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`.
 
 </div>
 
 <div data-lang="java"  markdown="1">
-Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using
+Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using
 Spark.  At the core of this component is a new type of RDD,
 [JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD).  JavaSchemaRDDs are composed
 [Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects along with
 a schema that describes the data types of each column in the row.  A JavaSchemaRDD is similar to a table
 in a traditional relational database.  A JavaSchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io)
-file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
+file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
 </div>
 
 <div data-lang="python"  markdown="1">
@@ -41,7 +41,7 @@ Spark.  At the core of this component is a new type of RDD,
 [Row](api/python/pyspark.sql.Row-class.html) objects along with
 a schema that describes the data types of each column in the row.  A SchemaRDD is similar to a table
 in a traditional relational database.  A SchemaRDD can be created from an existing RDD, [Parquet](http://parquet.io)
-file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
+file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
 
 All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell.
 </div>
@@ -64,8 +64,8 @@ descendants.  To create a basic SQLContext, all you need is a SparkContext.
 val sc: SparkContext // An existing SparkContext.
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 
-// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
-import sqlContext._
+// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
+import sqlContext.createSchemaRDD
 {% endhighlight %}
 
 </div>
@@ -77,8 +77,8 @@ The entry point into all relational functionality in Spark is the
 of its descendants.  To create a basic JavaSQLContext, all you need is a JavaSparkContext.
 
 {% highlight java %}
-JavaSparkContext ctx = ...; // An existing JavaSparkContext.
-JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx);
+JavaSparkContext sc = ...; // An existing JavaSparkContext.
+JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
 {% endhighlight %}
 
 </div>
@@ -91,14 +91,33 @@ of its decedents.  To create a basic SQLContext, all you need is a SparkContext.
 
 {% highlight python %}
 from pyspark.sql import SQLContext
-sqlCtx = SQLContext(sc)
+sqlContext = SQLContext(sc)
 {% endhighlight %}
 
 </div>
 
 </div>
 
-## Running SQL on RDDs
+# Data Sources
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface.
+Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources.
+</div>
+
+<div data-lang="java"  markdown="1">
+Spark SQL supports operating on a variety of data sources through the `JavaSchemaRDD` interface.
+Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources.
+</div>
+
+<div data-lang="python"  markdown="1">
+Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface.
+Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources.
+</div>
+</div>
+
+## RDDs
 
 <div class="codetabs">
 
@@ -111,8 +130,10 @@ types such as Sequences or Arrays. This RDD can be implicitly converted to a Sch
 registered as a table.  Tables can be used in subsequent SQL statements.
 
 {% highlight scala %}
+// sc is an existing SparkContext.
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-import sqlContext._
+// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
+import sqlContext.createSchemaRDD
 
 // Define the schema using a case class.
 // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, 
@@ -124,7 +145,7 @@ val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split("
 people.registerAsTable("people")
 
 // SQL statements can be run by using the sql methods provided by sqlContext.
-val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
 
 // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
 // The columns of a row in the result can be accessed by ordinal.
@@ -170,12 +191,11 @@ A schema can be applied to an existing RDD by calling `applySchema` and providin
 for the JavaBean.
 
 {% highlight java %}
-
-JavaSparkContext ctx = ...; // An existing JavaSparkContext.
-JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx)
+// sc is an existing JavaSparkContext.
+JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc)
 
 // Load a text file and convert each line to a JavaBean.
-JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
+JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
   new Function<String, Person>() {
     public Person call(String line) throws Exception {
       String[] parts = line.split(",");
@@ -189,11 +209,11 @@ JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").
   });
 
 // Apply a schema to an RDD of JavaBeans and register it as a table.
-JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
+JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
 schemaPeople.registerAsTable("people");
 
 // SQL can be run over RDDs that have been registered as tables.
-JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
 
 // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
 // The columns of a row in the result can be accessed by ordinal.
@@ -215,6 +235,10 @@ row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as
 can be used in subsequent SQL statements.
 
 {% highlight python %}
+# sc is an existing SparkContext.
+from pyspark.sql import SQLContext
+sqlContext = SQLContext(sc)
+
 # Load a text file and convert each line to a dictionary.
 lines = sc.textFile("examples/src/main/resources/people.txt")
 parts = lines.map(lambda l: l.split(","))
@@ -223,14 +247,16 @@ people = parts.map(lambda p: {"name": p[0], "age": int(p[1])})
 # Infer the schema, and register the SchemaRDD as a table.
 # In future versions of PySpark we would like to add support for registering RDDs with other
 # datatypes as tables
-peopleTable = sqlCtx.inferSchema(people)
-peopleTable.registerAsTable("people")
+schemaPeople = sqlContext.inferSchema(people)
+schemaPeople.registerAsTable("people")
 
 # SQL can be run over SchemaRDDs that have been registered as a table.
-teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
 
 # The results of SQL queries are RDDs and support all the normal RDD operations.
 teenNames = teenagers.map(lambda p: "Name: " + p.name)
+for teenName in teenNames.collect():
+  print teenName
 {% endhighlight %}
 
 </div>
@@ -241,7 +267,7 @@ teenNames = teenagers.map(lambda p: "Name: " + p.name)
 Users that want a more complete dialect of SQL should look at the HiveQL support provided by
 `HiveContext`.
 
-## Using Parquet
+## Parquet Files
 
 [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
 Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
@@ -252,22 +278,23 @@ of the original data.  Using the data from the above example:
 <div data-lang="scala"  markdown="1">
 
 {% highlight scala %}
-val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-import sqlContext._
+// sqlContext from the previous example is used in this example.
+// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
+import sqlContext.createSchemaRDD
 
 val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
 
-// The RDD is implicitly converted to a SchemaRDD, allowing it to be stored using Parquet.
+// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
 people.saveAsParquetFile("people.parquet")
 
 // Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
-// The result of loading a Parquet file is also a JavaSchemaRDD.
+// The result of loading a Parquet file is also a SchemaRDD.
 val parquetFile = sqlContext.parquetFile("people.parquet")
 
 //Parquet files can also be registered as tables and then used in SQL statements.
 parquetFile.registerAsTable("parquetFile")
-val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
-teenagers.collect().foreach(println)
+val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
+teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
 {% endhighlight %}
 
 </div>
@@ -275,6 +302,7 @@ teenagers.collect().foreach(println)
 <div data-lang="java"  markdown="1">
 
 {% highlight java %}
+// sqlContext from the previous example is used in this example.
 
 JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.
 
@@ -283,13 +311,16 @@ schemaPeople.saveAsParquetFile("people.parquet");
 
 // Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
 // The result of loading a parquet file is also a JavaSchemaRDD.
-JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
+JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");
 
 //Parquet files can also be registered as tables and then used in SQL statements.
 parquetFile.registerAsTable("parquetFile");
-JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
-
-
+JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
+List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
+  public String call(Row row) {
+    return "Name: " + row.getString(0);
+  }
+}).collect();
 {% endhighlight %}
 
 </div>
@@ -297,50 +328,149 @@ JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >=
 <div data-lang="python"  markdown="1">
 
 {% highlight python %}
+# sqlContext from the previous example is used in this example.
 
-peopleTable # The SchemaRDD from the previous example.
+schemaPeople # The SchemaRDD from the previous example.
 
 # SchemaRDDs can be saved as Parquet files, maintaining the schema information.
-peopleTable.saveAsParquetFile("people.parquet")
+schemaPeople.saveAsParquetFile("people.parquet")
 
 # Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
 # The result of loading a parquet file is also a SchemaRDD.
-parquetFile = sqlCtx.parquetFile("people.parquet")
+parquetFile = sqlContext.parquetFile("people.parquet")
 
 # Parquet files can also be registered as tables and then used in SQL statements.
 parquetFile.registerAsTable("parquetFile");
-teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
-
+teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
+teenNames = teenagers.map(lambda p: "Name: " + p.name)
+for teenName in teenNames.collect():
+  print teenName
 {% endhighlight %}
 
 </div>
 
 </div>
 
-## Writing Language-Integrated Relational Queries
+## JSON Datasets
+<div class="codetabs">
 
-**Language-Integrated queries are currently only supported in Scala.**
+<div data-lang="scala"  markdown="1">
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD.
+This conversion can be done using one of two methods in a SQLContext:
 
-Spark SQL also supports a domain specific language for writing queries.  Once again,
-using the data from the above examples:
+* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
+* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
 
 {% highlight scala %}
+// sc is an existing SparkContext.
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-import sqlContext._
-val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
 
-// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
-val teenagers = people.where('age >= 10).where('age <= 19).select('name)
+// A JSON dataset is pointed to by path.
+// The path can be either a single text file or a directory storing text files.
+val path = "examples/src/main/resources/people.json"
+// Create a SchemaRDD from the file(s) pointed to by path
+val people = sqlContext.jsonFile(path)
+
+// The inferred schema can be visualized using the printSchema() method.
+people.printSchema()
+// root
+//  |-- age: IntegerType
+//  |-- name: StringType
+
+// Register this SchemaRDD as a table.
+people.registerAsTable("people")
+
+// SQL statements can be run by using the sql methods provided by sqlContext.
+val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+
+// Alternatively, a SchemaRDD can be created for a JSON dataset represented by
+// an RDD[String] storing one JSON object per string.
+val anotherPeopleRDD = sc.parallelize(
+  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
+val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
 {% endhighlight %}
 
-The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers
-prefixed with a tick (`'`).  Implicit conversions turn these symbols into expressions that are
-evaluated by the SQL execution engine.  A full list of the functions supported can be found in the
-[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD).
+</div>
 
-<!-- TODO: Include the table of operations here. -->
+<div data-lang="java"  markdown="1">
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a JavaSchemaRDD.
+This conversion can be done using one of two methods in a JavaSQLContext :
 
-# Hive Support
+* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
+* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
+
+{% highlight java %}
+// sc is an existing JavaSparkContext.
+JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
+
+// A JSON dataset is pointed to by path.
+// The path can be either a single text file or a directory storing text files.
+String path = "examples/src/main/resources/people.json";
+// Create a JavaSchemaRDD from the file(s) pointed to by path
+JavaSchemaRDD people = sqlContext.jsonFile(path);
+
+// The inferred schema can be visualized using the printSchema() method.
+people.printSchema();
+// root
+//  |-- age: IntegerType
+//  |-- name: StringType
+
+// Register this JavaSchemaRDD as a table.
+people.registerAsTable("people");
+
+// SQL statements can be run by using the sql methods provided by sqlContext.
+JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
+
+// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
+// an RDD[String] storing one JSON object per string.
+List<String> jsonData = Arrays.asList(
+  "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
+JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
+JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);
+{% endhighlight %}
+</div>
+
+<div data-lang="python"  markdown="1">
+Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD.
+This conversion can be done using one of two methods in a SQLContext:
+
+* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
+* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
+
+{% highlight python %}
+# sc is an existing SparkContext.
+from pyspark.sql import SQLContext
+sqlContext = SQLContext(sc)
+
+# A JSON dataset is pointed to by path.
+# The path can be either a single text file or a directory storing text files.
+path = "examples/src/main/resources/people.json"
+# Create a SchemaRDD from the file(s) pointed to by path
+people = sqlContext.jsonFile(path)
+
+# The inferred schema can be visualized using the printSchema() method.
+people.printSchema()
+# root
+#  |-- age: IntegerType
+#  |-- name: StringType
+
+# Register this SchemaRDD as a table.
+people.registerAsTable("people")
+
+# SQL statements can be run by using the sql methods provided by sqlContext.
+teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+
+# Alternatively, a SchemaRDD can be created for a JSON dataset represented by
+# an RDD[String] storing one JSON object per string.
+anotherPeopleRDD = sc.parallelize([
+  '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'])
+anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
+{% endhighlight %}
+</div>
+
+</div>
+
+## Hive Tables
 
 Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/).
 However, since Hive has a large number of dependencies, it is not included in the default Spark assembly.
@@ -362,17 +492,14 @@ which is similar to `HiveContext`, but creates a local copy of the `metastore` a
 automatically.
 
 {% highlight scala %}
-val sc: SparkContext // An existing SparkContext.
+// sc is an existing SparkContext.
 val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 
-// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
-import hiveContext._
-
-hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
-hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
+hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
 
 // Queries are expressed in HiveQL
-hql("FROM src SELECT key, value").collect().foreach(println)
+hiveContext.hql("FROM src SELECT key, value").collect().foreach(println)
 {% endhighlight %}
 
 </div>
@@ -385,14 +512,14 @@ the `sql` method a `JavaHiveContext` also provides an `hql` methods, which allow
 expressed in HiveQL.
 
 {% highlight java %}
-JavaSparkContext ctx = ...; // An existing JavaSparkContext.
-JavaHiveContext hiveCtx = new org.apache.spark.sql.hive.api.java.HiveContext(ctx);
+// sc is an existing JavaSparkContext.
+JavaHiveContext hiveContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc);
 
-hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
-hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
+hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
+hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
 
 // Queries are expressed in HiveQL.
-Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect();
+Row[] results = hiveContext.hql("FROM src SELECT key, value").collect();
 
 {% endhighlight %}
 
@@ -406,17 +533,44 @@ the `sql` method a `HiveContext` also provides an `hql` methods, which allows qu
 expressed in HiveQL.
 
 {% highlight python %}
-
+# sc is an existing SparkContext.
 from pyspark.sql import HiveContext
-hiveCtx = HiveContext(sc)
+hiveContext = HiveContext(sc)
 
-hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
-hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
+hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
 
 # Queries can be expressed in HiveQL.
-results = hiveCtx.hql("FROM src SELECT key, value").collect()
+results = hiveContext.hql("FROM src SELECT key, value").collect()
 
 {% endhighlight %}
 
 </div>
 </div>
+
+
+# Writing Language-Integrated Relational Queries
+
+**Language-Integrated queries are currently only supported in Scala.**
+
+Spark SQL also supports a domain specific language for writing queries.  Once again,
+using the data from the above examples:
+
+{% highlight scala %}
+// sc is an existing SparkContext.
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
+import sqlContext._
+val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
+
+// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
+val teenagers = people.where('age >= 10).where('age <= 19).select('name)
+teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
+{% endhighlight %}
+
+The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers
+prefixed with a tick (`'`).  Implicit conversions turn these symbols into expressions that are
+evaluated by the SQL execution engine.  A full list of the functions supported can be found in the
+[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD).
+
+<!-- TODO: Include the table of operations here. -->
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index ad5ec84..607df3e 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -18,6 +18,7 @@
 package org.apache.spark.examples.sql;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.spark.SparkConf;
@@ -56,6 +57,7 @@ public class JavaSparkSQL {
     JavaSparkContext ctx = new JavaSparkContext(sparkConf);
     JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
 
+    System.out.println("=== Data source: RDD ===");
     // Load a text file and convert each line to a Java Bean.
     JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
       new Function<String, Person>() {
@@ -84,16 +86,88 @@ public class JavaSparkSQL {
         return "Name: " + row.getString(0);
       }
     }).collect();
+    for (String name: teenagerNames) {
+      System.out.println(name);
+    }
 
+    System.out.println("=== Data source: Parquet File ===");
     // JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
     schemaPeople.saveAsParquetFile("people.parquet");
 
-    // Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
+    // Read in the parquet file created above.
+    // Parquet files are self-describing so the schema is preserved.
     // The result of loading a parquet file is also a JavaSchemaRDD.
     JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
 
     //Parquet files can also be registered as tables and then used in SQL statements.
     parquetFile.registerAsTable("parquetFile");
-    JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
+    JavaSchemaRDD teenagers2 =
+      sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
+    teenagerNames = teenagers2.map(new Function<Row, String>() {
+      public String call(Row row) {
+          return "Name: " + row.getString(0);
+      }
+    }).collect();
+    for (String name: teenagerNames) {
+      System.out.println(name);
+    }
+
+    System.out.println("=== Data source: JSON Dataset ===");
+    // A JSON dataset is pointed by path.
+    // The path can be either a single text file or a directory storing text files.
+    String path = "examples/src/main/resources/people.json";
+    // Create a JavaSchemaRDD from the file(s) pointed by path
+    JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
+
+    // Because the schema of a JSON dataset is automatically inferred, to write queries,
+    // it is better to take a look at what is the schema.
+    peopleFromJsonFile.printSchema();
+    // The schema of people is ...
+    // root
+    //  |-- age: IntegerType
+    //  |-- name: StringType
+
+    // Register this JavaSchemaRDD as a table.
+    peopleFromJsonFile.registerAsTable("people");
+
+    // SQL statements can be run by using the sql methods provided by sqlCtx.
+    JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
+
+    // The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
+    // The columns of a row in the result can be accessed by ordinal.
+    teenagerNames = teenagers3.map(new Function<Row, String>() {
+      public String call(Row row) { return "Name: " + row.getString(0); }
+    }).collect();
+    for (String name: teenagerNames) {
+      System.out.println(name);
+    }
+
+    // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
+    // a RDD[String] storing one JSON object per string.
+    List<String> jsonData = Arrays.asList(
+          "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
+    JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
+    JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD);
+
+    // Take a look at the schema of this new JavaSchemaRDD.
+    peopleFromJsonRDD.printSchema();
+    // The schema of anotherPeople is ...
+    // root
+    //  |-- address: StructType
+    //  |    |-- city: StringType
+    //  |    |-- state: StringType
+    //  |-- name: StringType
+
+    peopleFromJsonRDD.registerAsTable("people2");
+
+    JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
+    List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
+      public String call(Row row) {
+        return "Name: " + row.getString(0) + ", City: " + row.getString(1);
+      }
+    }).collect();
+    for (String name: nameAndCity) {
+      System.out.println(name);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/examples/src/main/resources/people.json
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/people.json b/examples/src/main/resources/people.json
new file mode 100644
index 0000000..50a859c
--- /dev/null
+++ b/examples/src/main/resources/people.json
@@ -0,0 +1,3 @@
+{"name":"Michael"}
+{"name":"Andy", "age":30}
+{"name":"Justin", "age":19}

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c0e3bba..d71771a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -72,7 +72,7 @@ object SparkBuild extends Build {
 
   lazy val catalyst = Project("catalyst", file("sql/catalyst"), settings = catalystSettings) dependsOn(core)
 
-  lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core, catalyst)
+  lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core) dependsOn(catalyst % "compile->compile;test->test")
 
   lazy val hive = Project("hive", file("sql/hive"), settings = hiveSettings) dependsOn(sql)
 
@@ -484,9 +484,23 @@ object SparkBuild extends Build {
   def sqlCoreSettings = sharedSettings ++ Seq(
     name := "spark-sql",
     libraryDependencies ++= Seq(
-      "com.twitter" % "parquet-column" % parquetVersion,
-      "com.twitter" % "parquet-hadoop" % parquetVersion
-    )
+      "com.twitter"                  % "parquet-column"             % parquetVersion,
+      "com.twitter"                  % "parquet-hadoop"             % parquetVersion,
+      "com.fasterxml.jackson.core"   % "jackson-databind"           % "2.3.0" // json4s-jackson 3.2.6 requires jackson-databind 2.3.0.
+    ),
+    initialCommands in console :=
+      """
+        |import org.apache.spark.sql.catalyst.analysis._
+        |import org.apache.spark.sql.catalyst.dsl._
+        |import org.apache.spark.sql.catalyst.errors._
+        |import org.apache.spark.sql.catalyst.expressions._
+        |import org.apache.spark.sql.catalyst.plans.logical._
+        |import org.apache.spark.sql.catalyst.rules._
+        |import org.apache.spark.sql.catalyst.types._
+        |import org.apache.spark.sql.catalyst.util._
+        |import org.apache.spark.sql.execution
+        |import org.apache.spark.sql.test.TestSQLContext._
+        |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
   )
 
   // Since we don't include hive in the main assembly this project also acts as an alternative

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index c31d49c..5051c82 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from pyspark.rdd import RDD
+from pyspark.rdd import RDD, PipelinedRDD
 from pyspark.serializers import BatchedSerializer, PickleSerializer
 
 from py4j.protocol import Py4JError
@@ -137,6 +137,53 @@ class SQLContext:
         jschema_rdd = self._ssql_ctx.parquetFile(path)
         return SchemaRDD(jschema_rdd, self)
 
+
+    def jsonFile(self, path):
+        """Loads a text file storing one JSON object per line,
+           returning the result as a L{SchemaRDD}.
+           It goes through the entire dataset once to determine the schema.
+
+        >>> import tempfile, shutil
+        >>> jsonFile = tempfile.mkdtemp()
+        >>> shutil.rmtree(jsonFile)
+        >>> ofn = open(jsonFile, 'w')
+        >>> for json in jsonStrings:
+        ...   print>>ofn, json
+        >>> ofn.close()
+        >>> srdd = sqlCtx.jsonFile(jsonFile)
+        >>> sqlCtx.registerRDDAsTable(srdd, "table1")
+        >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
+        >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
+        ...                     {"f1": 2, "f2": "row2", "f3":{"field4":22}},
+        ...                     {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
+        True
+        """
+        jschema_rdd = self._ssql_ctx.jsonFile(path)
+        return SchemaRDD(jschema_rdd, self)
+
+    def jsonRDD(self, rdd):
+        """Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}.
+           It goes through the entire dataset once to determine the schema.
+
+        >>> srdd = sqlCtx.jsonRDD(json)
+        >>> sqlCtx.registerRDDAsTable(srdd, "table1")
+        >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
+        >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
+        ...                     {"f1": 2, "f2": "row2", "f3":{"field4":22}},
+        ...                     {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
+        True
+        """
+        def func(split, iterator):
+            for x in iterator:
+                if not isinstance(x, basestring):
+                    x = unicode(x)
+                yield x.encode("utf-8")
+        keyed = PipelinedRDD(rdd, func)
+        keyed._bypass_serializer = True
+        jrdd = keyed._jrdd.map(self._jvm.BytesToString())
+        jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd())
+        return SchemaRDD(jschema_rdd, self)
+
     def sql(self, sqlQuery):
         """Return a L{SchemaRDD} representing the result of the given query.
 
@@ -265,7 +312,7 @@ class SchemaRDD(RDD):
 
     For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the
     L{SchemaRDD} is not operated on directly, as it's underlying
-    implementation is a RDD composed of Java objects. Instead it is
+    implementation is an RDD composed of Java objects. Instead it is
     converted to a PythonRDD in the JVM, on which Python operations can
     be done.
     """
@@ -337,6 +384,14 @@ class SchemaRDD(RDD):
         """Creates a new table with the contents of this SchemaRDD."""
         self._jschema_rdd.saveAsTable(tableName)
 
+    def schemaString(self):
+        """Returns the output schema in the tree format."""
+        return self._jschema_rdd.schemaString()
+
+    def printSchema(self):
+        """Prints out the schema in the tree format."""
+        print self.schemaString()
+
     def count(self):
         """Return the number of elements in this RDD.
 
@@ -436,6 +491,11 @@ def _test():
     globs['sqlCtx'] = SQLContext(sc)
     globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
         {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
+    jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
+       '{"field1" : 2, "field2": "row2", "field3":{"field4":22}}',
+       '{"field1" : 3, "field2": "row3", "field3":{"field4":33}}']
+    globs['jsonStrings'] = jsonStrings
+    globs['json'] = sc.parallelize(jsonStrings)
     globs['nestedRdd1'] = sc.parallelize([
         {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
         {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/catalyst/pom.xml
----------------------------------------------------------------------
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index d3f65de..fc6d3d7 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -65,6 +65,34 @@
         <groupId>org.scalatest</groupId>
         <artifactId>scalatest-maven-plugin</artifactId>
       </plugin>
+
+      <!--
+           This plugin forces the generation of jar containing catalyst test classes,
+           so that the tests classes of external modules can use them. The two execution profiles
+           are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally,
+           'mvn compile' should not compile test classes and therefore should not need this.
+           However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559)
+           causes the compilation to fail if catalyst test-jar is not generated. Hence, the
+           second execution profile for 'mvn compile'.
+      -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+            <execution>
+                <goals>
+                    <goal>test-jar</goal>
+                </goals>
+            </execution>
+            <execution>
+                <id>test-jar-on-compile</id>
+                <phase>compile</phase>
+                <goals>
+                    <goal>test-jar</goal>
+                </goals>
+            </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index d291814..66bff66 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -22,6 +22,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.types._
 
+object HiveTypeCoercion {
+  // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
+  // The conversion for integral and floating point types have a linear widening hierarchy:
+  val numericPrecedence =
+    Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
+  // Boolean is only wider than Void
+  val booleanPrecedence = Seq(NullType, BooleanType)
+  val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
+}
+
 /**
  * A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that
  * participate in operations into compatible ones.  Most of these rules are based on Hive semantics,
@@ -116,19 +126,18 @@ trait HiveTypeCoercion {
    *
    * Additionally, all types when UNION-ed with strings will be promoted to strings.
    * Other string conversions are handled by PromoteStrings.
+   *
+   * Widening types might result in loss of precision in the following cases:
+   * - IntegerType to FloatType
+   * - LongType to FloatType
+   * - LongType to DoubleType
    */
   object WidenTypes extends Rule[LogicalPlan] {
-    // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
-    // The conversion for integral and floating point types have a linear widening hierarchy:
-    val numericPrecedence =
-      Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
-    // Boolean is only wider than Void
-    val booleanPrecedence = Seq(NullType, BooleanType)
-    val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
 
     def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
       // Try and find a promotion rule that contains both types in question.
-      val applicableConversion = allPromotions.find(p => p.contains(t1) && p.contains(t2))
+      val applicableConversion =
+        HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))
 
       // If found return the widest common type, otherwise None
       applicableConversion.map(_.filter(t => t == t1 || t == t2).last)

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 8199a80..00e2d3b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.catalyst.plans
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans
 import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.catalyst.types.{ArrayType, DataType, StructField, StructType}
 
 abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
   self: PlanType with Product =>
@@ -123,4 +125,53 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
       case other => Nil
     }.toSeq
   }
+
+  protected def generateSchemaString(schema: Seq[Attribute]): String = {
+    val builder = new StringBuilder
+    builder.append("root\n")
+    val prefix = " |"
+    schema.foreach { attribute =>
+      val name = attribute.name
+      val dataType = attribute.dataType
+      dataType match {
+        case fields: StructType =>
+          builder.append(s"$prefix-- $name: $StructType\n")
+          generateSchemaString(fields, s"$prefix    |", builder)
+        case ArrayType(fields: StructType) =>
+          builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
+          generateSchemaString(fields, s"$prefix    |", builder)
+        case ArrayType(elementType: DataType) =>
+          builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
+        case _ => builder.append(s"$prefix-- $name: $dataType\n")
+      }
+    }
+
+    builder.toString()
+  }
+
+  protected def generateSchemaString(
+      schema: StructType,
+      prefix: String,
+      builder: StringBuilder): StringBuilder = {
+    schema.fields.foreach {
+      case StructField(name, fields: StructType, _) =>
+        builder.append(s"$prefix-- $name: $StructType\n")
+        generateSchemaString(fields, s"$prefix    |", builder)
+      case StructField(name, ArrayType(fields: StructType), _) =>
+        builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
+        generateSchemaString(fields, s"$prefix    |", builder)
+      case StructField(name, ArrayType(elementType: DataType), _) =>
+        builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
+      case StructField(name, fieldType: DataType, _) =>
+        builder.append(s"$prefix-- $name: $fieldType\n")
+    }
+
+    builder
+  }
+
+  /** Returns the output schema in the tree format. */
+  def schemaString: String = generateSchemaString(output)
+
+  /** Prints out the schema in the tree format */
+  def printSchema(): Unit = println(schemaString)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
index 714f018..4896f1b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
@@ -18,11 +18,12 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 
-class CombiningLimitsSuite extends OptimizerTest {
+class CombiningLimitsSuite extends PlanTest {
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 6efc0e2..cea97c5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.types._
 
@@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 
-class ConstantFoldingSuite extends OptimizerTest {
+class ConstantFoldingSuite extends PlanTest {
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 1f67c80..ebb123c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.LeftOuter
-import org.apache.spark.sql.catalyst.plans.RightOuter
+import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 
-class FilterPushdownSuite extends OptimizerTest {
+class FilterPushdownSuite extends PlanTest {
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala
deleted file mode 100644
index 89982d5..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerTest.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.util._
-
-/**
- * Provides helper methods for comparing plans produced by optimization rules with the expected
- * result
- */
-class OptimizerTest extends FunSuite {
-
-  /**
-   * Since attribute references are given globally unique ids during analysis,
-   * we must normalize them to check if two different queries are identical.
-   */
-  protected def normalizeExprIds(plan: LogicalPlan) = {
-    val minId = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)).min
-    plan transformAllExpressions {
-      case a: AttributeReference =>
-        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
-    }
-  }
-
-  /** Fails the test if the two plans do not match */
-  protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
-    val normalized1 = normalizeExprIds(plan1)
-    val normalized2 = normalizeExprIds(plan2)
-    if (normalized1 != normalized2)
-      fail(
-        s"""
-          |== FAIL: Plans do not match ===
-          |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
-        """.stripMargin)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
index df1409f..22992fb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCaseConversionExpressionsSuite.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.rules._
 
 /* Implicit conversions */
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 
-class SimplifyCaseConversionExpressionsSuite extends OptimizerTest {
+class SimplifyCaseConversionExpressionsSuite extends PlanTest {
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
new file mode 100644
index 0000000..7e9f47e
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util._
+
+/**
+ * Provides helper methods for comparing plans.
+ */
+class PlanTest extends FunSuite {
+
+  /**
+   * Since attribute references are given globally unique ids during analysis,
+   * we must normalize them to check if two different queries are identical.
+   */
+  protected def normalizeExprIds(plan: LogicalPlan) = {
+    val minId = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)).min
+    plan transformAllExpressions {
+      case a: AttributeReference =>
+        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
+    }
+  }
+
+  /** Fails the test if the two plans do not match */
+  protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
+    val normalized1 = normalizeExprIds(plan1)
+    val normalized2 = normalizeExprIds(plan2)
+    if (normalized1 != normalized2)
+      fail(
+        s"""
+          |== FAIL: Plans do not match ===
+          |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
+        """.stripMargin)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 6afed6d..21302b3 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -43,6 +43,13 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
       <version>${parquet.version}</version>
@@ -53,6 +60,11 @@
       <version>${parquet.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.3.0</version>
+    </dependency>
+    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 131c130..f7e0332 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -22,24 +22,22 @@ import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
 import org.apache.spark.rdd.RDD
-
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
+import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-
 import org.apache.spark.sql.columnar.InMemoryRelation
-
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.SparkStrategies
-
+import org.apache.spark.sql.json._
 import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.SparkContext
 
 /**
  * :: AlphaComponent ::
@@ -53,7 +51,7 @@ import org.apache.spark.sql.parquet.ParquetRelation
 class SQLContext(@transient val sparkContext: SparkContext)
   extends Logging
   with SQLConf
-  with dsl.ExpressionConversions
+  with ExpressionConversions
   with Serializable {
 
   self =>
@@ -99,6 +97,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
     new SchemaRDD(this, parquet.ParquetRelation(path))
 
   /**
+   * Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group userf
+   */
+  def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0)
+
+  /**
+   * :: Experimental ::
+   */
+  @Experimental
+  def jsonFile(path: String, samplingRatio: Double): SchemaRDD = {
+    val json = sparkContext.textFile(path)
+    jsonRDD(json, samplingRatio)
+  }
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+   * [[SchemaRDD]].
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group userf
+   */
+  def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0)
+
+  /**
+   * :: Experimental ::
+   */
+  @Experimental
+  def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
+    new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
+
+  /**
    * :: Experimental ::
    * Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
    * This registered table can be used as the target of future `insertInto` operations.

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 89eaba2..7c0efb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
-import org.apache.spark.sql.catalyst.types.BooleanType
+import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType}
 import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
 import org.apache.spark.api.java.JavaRDD
 import java.util.{Map => JMap}
@@ -41,8 +41,10 @@ import java.util.{Map => JMap}
  * whose elements are scala case classes into a SchemaRDD.  This conversion can also be done
  * explicitly using the `createSchemaRDD` function on a [[SQLContext]].
  *
- * A `SchemaRDD` can also be created by loading data in from external sources, for example,
- * by using the `parquetFile` method on [[SQLContext]].
+ * A `SchemaRDD` can also be created by loading data in from external sources.
+ * Examples are loading data from Parquet files by using by using the
+ * `parquetFile` method on [[SQLContext]], and loading JSON datasets
+ * by using `jsonFile` and `jsonRDD` methods on [[SQLContext]].
  *
  * == SQL Queries ==
  * A SchemaRDD can be registered as a table in the [[SQLContext]] that was used to create it.  Once
@@ -341,14 +343,38 @@ class SchemaRDD(
    */
   def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
 
+  /**
+   * Converts a JavaRDD to a PythonRDD. It is used by pyspark.
+   */
   private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
-    val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
+    def rowToMap(row: Row, structType: StructType): JMap[String, Any] = {
+      val fields = structType.fields.map(field => (field.name, field.dataType))
+      val map: JMap[String, Any] = new java.util.HashMap
+      row.zip(fields).foreach {
+        case (obj, (name, dataType)) =>
+          dataType match {
+            case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
+            case other => map.put(name, obj)
+          }
+      }
+
+      map
+    }
+
+    // TODO: Actually, the schema of a row should be represented by a StructType instead of
+    // a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to
+    // construct the Map for python.
+    val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map(
+      field => (field.name, field.dataType))
     this.mapPartitions { iter =>
       val pickle = new Pickler
       iter.map { row =>
         val map: JMap[String, Any] = new java.util.HashMap
-        row.zip(fieldNames).foreach { case (obj, name) =>
-          map.put(name, obj)
+        row.zip(fields).foreach { case (obj, (name, dataType)) =>
+          dataType match {
+            case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
+            case other => map.put(name, obj)
+          }
         }
         map
       }.grouped(10).map(batched => pickle.dumps(batched.toArray))

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 656be96..fe81721 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -122,4 +122,10 @@ private[sql] trait SchemaRDDLike {
   @Experimental
   def saveAsTable(tableName: String): Unit =
     sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
+
+  /** Returns the output schema in the tree format. */
+  def schemaString: String = queryExecution.analyzed.schemaString
+
+  /** Prints out the schema in the tree format. */
+  def printSchema(): Unit = println(schemaString)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 352260f..ff98422 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.sql.json.JsonRDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
 import org.apache.spark.sql.catalyst.types._
@@ -101,6 +102,25 @@ class JavaSQLContext(val sqlContext: SQLContext) {
     new JavaSchemaRDD(sqlContext, ParquetRelation(path))
 
   /**
+   * Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group userf
+   */
+  def jsonFile(path: String): JavaSchemaRDD =
+    jsonRDD(sqlContext.sparkContext.textFile(path))
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+   * [[JavaSchemaRDD]].
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group userf
+   */
+  def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
+    new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
+
+  /**
    * Registers the given RDD as a temporary table in the catalog.  Temporary tables exist only
    * during the lifetime of this instance of SQLContext.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
new file mode 100644
index 0000000..edf8677
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.json
+
+import scala.collection.JavaConversions._
+import scala.math.BigDecimal
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
+import org.apache.spark.sql.Logging
+
+private[sql] object JsonRDD extends Logging {
+
+  private[sql] def inferSchema(
+      json: RDD[String],
+      samplingRatio: Double = 1.0): LogicalPlan = {
+    require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
+    val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
+    val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
+    val baseSchema = createSchema(allKeys)
+
+    createLogicalPlan(json, baseSchema)
+  }
+
+  private def createLogicalPlan(
+      json: RDD[String],
+      baseSchema: StructType): LogicalPlan = {
+    val schema = nullTypeToStringType(baseSchema)
+
+    SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))
+  }
+
+  private def createSchema(allKeys: Set[(String, DataType)]): StructType = {
+    // Resolve type conflicts
+    val resolved = allKeys.groupBy {
+      case (key, dataType) => key
+    }.map {
+      // Now, keys and types are organized in the format of
+      // key -> Set(type1, type2, ...).
+      case (key, typeSet) => {
+        val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq
+        val dataType = typeSet.map {
+          case (_, dataType) => dataType
+        }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
+
+        (fieldName, dataType)
+      }
+    }
+
+    def makeStruct(values: Seq[Seq[String]], prefix: Seq[String]): StructType = {
+      val (topLevel, structLike) = values.partition(_.size == 1)
+      val topLevelFields = topLevel.filter {
+        name => resolved.get(prefix ++ name).get match {
+          case ArrayType(StructType(Nil)) => false
+          case ArrayType(_) => true
+          case struct: StructType => false
+          case _ => true
+        }
+      }.map {
+        a => StructField(a.head, resolved.get(prefix ++ a).get, nullable = true)
+      }
+
+      val structFields: Seq[StructField] = structLike.groupBy(_(0)).map {
+        case (name, fields) => {
+          val nestedFields = fields.map(_.tail)
+          val structType = makeStruct(nestedFields, prefix :+ name)
+          val dataType = resolved.get(prefix :+ name).get
+          dataType match {
+            case array: ArrayType => Some(StructField(name, ArrayType(structType), nullable = true))
+            case struct: StructType => Some(StructField(name, structType, nullable = true))
+            // dataType is StringType means that we have resolved type conflicts involving
+            // primitive types and complex types. So, the type of name has been relaxed to
+            // StringType. Also, this field should have already been put in topLevelFields.
+            case StringType => None
+          }
+        }
+      }.flatMap(field => field).toSeq
+
+      StructType(
+        (topLevelFields ++ structFields).sortBy {
+        case StructField(name, _, _) => name
+      })
+    }
+
+    makeStruct(resolved.keySet.toSeq, Nil)
+  }
+
+  /**
+   * Returns the most general data type for two given data types.
+   */
+  private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
+    // Try and find a promotion rule that contains both types in question.
+    val applicableConversion = HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p
+      .contains(t2))
+
+    // If found return the widest common type, otherwise None
+    val returnType = applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
+
+    if (returnType.isDefined) {
+      returnType.get
+    } else {
+      // t1 or t2 is a StructType, ArrayType, or an unexpected type.
+      (t1, t2) match {
+        case (other: DataType, NullType) => other
+        case (NullType, other: DataType) => other
+        case (StructType(fields1), StructType(fields2)) => {
+          val newFields = (fields1 ++ fields2).groupBy(field => field.name).map {
+            case (name, fieldTypes) => {
+              val dataType = fieldTypes.map(field => field.dataType).reduce(
+                (type1: DataType, type2: DataType) => compatibleType(type1, type2))
+              StructField(name, dataType, true)
+            }
+          }
+          StructType(newFields.toSeq.sortBy {
+            case StructField(name, _, _) => name
+          })
+        }
+        case (ArrayType(elementType1), ArrayType(elementType2)) =>
+          ArrayType(compatibleType(elementType1, elementType2))
+        // TODO: We should use JsonObjectStringType to mark that values of field will be
+        // strings and every string is a Json object.
+        case (_, _) => StringType
+      }
+    }
+  }
+
+  private def typeOfPrimitiveValue(value: Any): DataType = {
+    value match {
+      case value: java.lang.String => StringType
+      case value: java.lang.Integer => IntegerType
+      case value: java.lang.Long => LongType
+      // Since we do not have a data type backed by BigInteger,
+      // when we see a Java BigInteger, we use DecimalType.
+      case value: java.math.BigInteger => DecimalType
+      case value: java.lang.Double => DoubleType
+      case value: java.math.BigDecimal => DecimalType
+      case value: java.lang.Boolean => BooleanType
+      case null => NullType
+      // Unexpected data type.
+      case _ => StringType
+    }
+  }
+
+  /**
+   * Returns the element type of an JSON array. We go through all elements of this array
+   * to detect any possible type conflict. We use [[compatibleType]] to resolve
+   * type conflicts. Right now, when the element of an array is another array, we
+   * treat the element as String.
+   */
+  private def typeOfArray(l: Seq[Any]): ArrayType = {
+    val elements = l.flatMap(v => Option(v))
+    if (elements.isEmpty) {
+      // If this JSON array is empty, we use NullType as a placeholder.
+      // If this array is not empty in other JSON objects, we can resolve
+      // the type after we have passed through all JSON objects.
+      ArrayType(NullType)
+    } else {
+      val elementType = elements.map {
+        e => e match {
+          case map: Map[_, _] => StructType(Nil)
+          // We have an array of arrays. If those element arrays do not have the same
+          // element types, we will return ArrayType[StringType].
+          case seq: Seq[_] =>  typeOfArray(seq)
+          case value => typeOfPrimitiveValue(value)
+        }
+      }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
+
+      ArrayType(elementType)
+    }
+  }
+
+  /**
+   * Figures out all key names and data types of values from a parsed JSON object
+   * (in the format of Map[Stirng, Any]). When the value of a key is an JSON object, we
+   * only use a placeholder (StructType(Nil)) to mark that it should be a struct
+   * instead of getting all fields of this struct because a field does not appear
+   * in this JSON object can appear in other JSON objects.
+   */
+  private def allKeysWithValueTypes(m: Map[String, Any]): Set[(String, DataType)] = {
+    m.map{
+      // Quote the key with backticks to handle cases which have dots
+      // in the field name.
+      case (key, dataType) => (s"`$key`", dataType)
+    }.flatMap {
+      case (key: String, struct: Map[String, Any]) => {
+        // The value associted with the key is an JSON object.
+        allKeysWithValueTypes(struct).map {
+          case (k, dataType) => (s"$key.$k", dataType)
+        } ++ Set((key, StructType(Nil)))
+      }
+      case (key: String, array: List[Any]) => {
+        // The value associted with the key is an array.
+        typeOfArray(array) match {
+          case ArrayType(StructType(Nil)) => {
+            // The elements of this arrays are structs.
+            array.asInstanceOf[List[Map[String, Any]]].flatMap {
+              element => allKeysWithValueTypes(element)
+            }.map {
+              case (k, dataType) => (s"$key.$k", dataType)
+            } :+ (key, ArrayType(StructType(Nil)))
+          }
+          case ArrayType(elementType) => (key, ArrayType(elementType)) :: Nil
+        }
+      }
+      case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
+    }.toSet
+  }
+
+  /**
+   * Converts a Java Map/List to a Scala Map/List.
+   * We do not use Jackson's scala module at here because
+   * DefaultScalaModule in jackson-module-scala will make
+   * the parsing very slow.
+   */
+  private def scalafy(obj: Any): Any = obj match {
+    case map: java.util.Map[String, Object] =>
+      // .map(identity) is used as a workaround of non-serializable Map
+      // generated by .mapValues.
+      // This issue is documented at https://issues.scala-lang.org/browse/SI-7005
+      map.toMap.mapValues(scalafy).map(identity)
+    case list: java.util.List[Object] =>
+      list.toList.map(scalafy)
+    case atom => atom
+  }
+
+  private def parseJson(json: RDD[String]): RDD[Map[String, Any]] = {
+    // According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72],
+    // ObjectMapper will not return BigDecimal when
+    // "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled
+    // (see NumberDeserializer.deserialize for the logic).
+    // But, we do not want to enable this feature because it will use BigDecimal
+    // for every float number, which will be slow.
+    // So, right now, we will have Infinity for those BigDecimal number.
+    // TODO: Support BigDecimal.
+    json.mapPartitions(iter => {
+      // When there is a key appearing multiple times (a duplicate key),
+      // the ObjectMapper will take the last value associated with this duplicate key.
+      // For example: for {"key": 1, "key":2}, we will get "key"->2.
+      val mapper = new ObjectMapper()
+      iter.map(record => mapper.readValue(record, classOf[java.util.Map[String, Any]]))
+      }).map(scalafy).map(_.asInstanceOf[Map[String, Any]])
+  }
+
+  private def toLong(value: Any): Long = {
+    value match {
+      case value: java.lang.Integer => value.asInstanceOf[Int].toLong
+      case value: java.lang.Long => value.asInstanceOf[Long]
+    }
+  }
+
+  private def toDouble(value: Any): Double = {
+    value match {
+      case value: java.lang.Integer => value.asInstanceOf[Int].toDouble
+      case value: java.lang.Long => value.asInstanceOf[Long].toDouble
+      case value: java.lang.Double => value.asInstanceOf[Double]
+    }
+  }
+
+  private def toDecimal(value: Any): BigDecimal = {
+    value match {
+      case value: java.lang.Integer => BigDecimal(value)
+      case value: java.lang.Long => BigDecimal(value)
+      case value: java.math.BigInteger => BigDecimal(value)
+      case value: java.lang.Double => BigDecimal(value)
+      case value: java.math.BigDecimal => BigDecimal(value)
+    }
+  }
+
+  private def toJsonArrayString(seq: Seq[Any]): String = {
+    val builder = new StringBuilder
+    builder.append("[")
+    var count = 0
+    seq.foreach {
+      element =>
+        if (count > 0) builder.append(",")
+        count += 1
+        builder.append(toString(element))
+    }
+    builder.append("]")
+
+    builder.toString()
+  }
+
+  private def toJsonObjectString(map: Map[String, Any]): String = {
+    val builder = new StringBuilder
+    builder.append("{")
+    var count = 0
+    map.foreach {
+      case (key, value) =>
+        if (count > 0) builder.append(",")
+        count += 1
+        builder.append(s"""\"${key}\":${toString(value)}""")
+    }
+    builder.append("}")
+
+    builder.toString()
+  }
+
+  private def toString(value: Any): String = {
+    value match {
+      case value: Map[String, Any] => toJsonObjectString(value)
+      case value: Seq[Any] => toJsonArrayString(value)
+      case value => Option(value).map(_.toString).orNull
+    }
+  }
+
+  private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={
+    if (value == null) {
+      null
+    } else {
+      desiredType match {
+        case ArrayType(elementType) =>
+          value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
+        case StringType => toString(value)
+        case IntegerType => value.asInstanceOf[IntegerType.JvmType]
+        case LongType => toLong(value)
+        case DoubleType => toDouble(value)
+        case DecimalType => toDecimal(value)
+        case BooleanType => value.asInstanceOf[BooleanType.JvmType]
+        case NullType => null
+      }
+    }
+  }
+
+  private def asRow(json: Map[String,Any], schema: StructType): Row = {
+    val row = new GenericMutableRow(schema.fields.length)
+    schema.fields.zipWithIndex.foreach {
+      // StructType
+      case (StructField(name, fields: StructType, _), i) =>
+        row.update(i, json.get(name).flatMap(v => Option(v)).map(
+          v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull)
+
+      // ArrayType(StructType)
+      case (StructField(name, ArrayType(structType: StructType), _), i) =>
+        row.update(i,
+          json.get(name).flatMap(v => Option(v)).map(
+            v => v.asInstanceOf[Seq[Any]].map(
+              e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull)
+
+      // Other cases
+      case (StructField(name, dataType, _), i) =>
+        row.update(i, json.get(name).flatMap(v => Option(v)).map(
+          enforceCorrectType(_, dataType)).getOrElse(null))
+    }
+
+    row
+  }
+
+  private def nullTypeToStringType(struct: StructType): StructType = {
+    val fields = struct.fields.map {
+      case StructField(fieldName, dataType, nullable) => {
+        val newType = dataType match {
+          case NullType => StringType
+          case ArrayType(NullType) => ArrayType(StringType)
+          case struct: StructType => nullTypeToStringType(struct)
+          case other: DataType => other
+        }
+        StructField(fieldName, newType, nullable)
+      }
+    }
+
+    StructType(fields)
+  }
+
+  private def asAttributes(struct: StructType): Seq[AttributeReference] = {
+    struct.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
+  }
+
+  private def asStruct(attributes: Seq[AttributeReference]): StructType = {
+    val fields = attributes.map {
+      case AttributeReference(name, dataType, nullable) => StructField(name, dataType, nullable)
+    }
+
+    StructType(fields)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index d7f6aba..ef84ead 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -17,12 +17,10 @@
 
 package org.apache.spark.sql
 
-import org.scalatest.FunSuite
-
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
 
-class QueryTest extends FunSuite {
+class QueryTest extends PlanTest {
   /**
    * Runs the plan and makes sure the answer matches the expected result.
    * @param rdd the [[SchemaRDD]] to be executed

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e22b38/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
index 9fff722..020baf0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala
@@ -22,6 +22,7 @@ import scala.beans.BeanProperty
 import org.scalatest.FunSuite
 
 import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.test.TestSQLContext
 
 // Implicits
@@ -111,4 +112,48 @@ class JavaSQLSuite extends FunSuite {
         """.stripMargin).collect.head.row ===
         Seq.fill(8)(null))
   }
+
+  test("loads JSON datasets") {
+    val jsonString =
+      """{"string":"this is a simple string.",
+          "integer":10,
+          "long":21474836470,
+          "bigInteger":92233720368547758070,
+          "double":1.7976931348623157E308,
+          "boolean":true,
+          "null":null
+      }""".replaceAll("\n", " ")
+    val rdd = javaCtx.parallelize(jsonString :: Nil)
+
+    var schemaRDD = javaSqlCtx.jsonRDD(rdd)
+
+    schemaRDD.registerAsTable("jsonTable1")
+
+    assert(
+      javaSqlCtx.sql("select * from jsonTable1").collect.head.row ===
+        Seq(BigDecimal("92233720368547758070"),
+            true,
+            1.7976931348623157E308,
+            10,
+            21474836470L,
+            null,
+            "this is a simple string."))
+
+    val file = getTempFilePath("json")
+    val path = file.toString
+    rdd.saveAsTextFile(path)
+    schemaRDD = javaSqlCtx.jsonFile(path)
+
+    schemaRDD.registerAsTable("jsonTable2")
+
+    assert(
+      javaSqlCtx.sql("select * from jsonTable2").collect.head.row ===
+        Seq(BigDecimal("92233720368547758070"),
+            true,
+            1.7976931348623157E308,
+            10,
+            21474836470L,
+            null,
+            "this is a simple string."))
+  }
 }


Mime
View raw message