geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [5/5] incubator-geode git commit: GEODE-9: Imported gemfire-spark-connector from geode-1.0.0-SNAPSHOT-2.src.tar
Date Mon, 06 Jul 2015 21:27:17 GMT
GEODE-9: Imported gemfire-spark-connector from geode-1.0.0-SNAPSHOT-2.src.tar


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/85d44f04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/85d44f04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/85d44f04

Branch: refs/heads/feature/GEODE-9
Commit: 85d44f046bfcf34a835141044f99259e2539ae9d
Parents: 11d0367
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Mon Jul 6 10:39:00 2015 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Mon Jul 6 11:32:53 2015 -0700

----------------------------------------------------------------------
 gemfire-spark-connector/.gitignore              |   1 +
 gemfire-spark-connector/README.md               |  45 ++
 gemfire-spark-connector/doc/10_demos.md         |  84 +++
 gemfire-spark-connector/doc/1_building.md       |  29 +
 gemfire-spark-connector/doc/2_quick.md          | 178 ++++++
 gemfire-spark-connector/doc/3_connecting.md     |  55 ++
 gemfire-spark-connector/doc/4_loading.md        | 101 ++++
 gemfire-spark-connector/doc/5_rdd_join.md       | 237 ++++++++
 gemfire-spark-connector/doc/6_save_rdd.md       |  61 ++
 gemfire-spark-connector/doc/7_save_dstream.md   |  68 +++
 gemfire-spark-connector/doc/8_oql.md            |  60 ++
 gemfire-spark-connector/doc/9_java_api.md       | 129 ++++
 .../connector/internal/RegionMetadata.java      |  77 +++
 .../gemfirefunctions/QueryFunction.java         |  83 +++
 .../RetrieveRegionFunction.java                 | 192 ++++++
 .../RetrieveRegionMetadataFunction.java         | 102 ++++
 .../StructStreamingResultSender.java            | 203 +++++++
 .../gemfire/spark/connector/Employee.java       |  38 ++
 .../spark/connector/JavaApiIntegrationTest.java | 368 ++++++++++++
 .../gemfire/spark/connector/Portfolio.java      |  93 +++
 .../gemfire/spark/connector/Position.java       |  57 ++
 .../src/it/resources/test-regions.xml           |  32 +
 .../src/it/resources/test-retrieve-regions.xml  |  40 ++
 .../spark/connector/BasicIntegrationTest.scala  | 582 +++++++++++++++++++
 .../RDDJoinRegionIntegrationTest.scala          | 284 +++++++++
 .../RetrieveRegionIntegrationTest.scala         | 237 ++++++++
 .../gemfire/spark/connector/package.scala       |  13 +
 .../connector/testkit/GemFireCluster.scala      |  31 +
 .../spark/connector/testkit/GemFireRunner.scala | 136 +++++
 .../spark/connector/testkit/IOUtils.scala       |  78 +++
 .../spark/streaming/ManualClockHelper.scala     |  12 +
 .../spark/streaming/TestInputDStream.scala      |  28 +
 .../javaapi/GemFireJavaDStreamFunctions.java    |  44 ++
 .../GemFireJavaPairDStreamFunctions.java        |  39 ++
 .../javaapi/GemFireJavaPairRDDFunctions.java    | 203 +++++++
 .../javaapi/GemFireJavaRDDFunctions.java        | 136 +++++
 .../javaapi/GemFireJavaSQLContextFunctions.java |  33 ++
 .../GemFireJavaSparkContextFunctions.java       |  71 +++
 .../connector/javaapi/GemFireJavaUtil.java      | 105 ++++
 .../spark/connector/GemFireConnection.scala     |  51 ++
 .../spark/connector/GemFireConnectionConf.scala |  57 ++
 .../connector/GemFireConnectionManager.scala    |  15 +
 .../connector/GemFireFunctionDeployer.scala     |  65 +++
 .../connector/GemFireKryoRegistrator.scala      |  13 +
 .../connector/GemFirePairRDDFunctions.scala     | 117 ++++
 .../spark/connector/GemFireRDDFunctions.scala   |  93 +++
 .../connector/GemFireSQLContextFunctions.scala  |  26 +
 .../GemFireSparkContextFunctions.scala          |  23 +
 .../internal/DefaultGemFireConnection.scala     | 126 ++++
 .../DefaultGemFireConnectionManager.scala       |  51 ++
 .../connector/internal/LocatorHelper.scala      |  30 +
 .../StructStreamingResultCollector.scala        | 136 +++++
 .../connector/internal/oql/QueryParser.scala    |  42 ++
 .../spark/connector/internal/oql/QueryRDD.scala |  67 +++
 .../internal/oql/QueryResultCollector.scala     |  53 ++
 .../connector/internal/oql/RDDConverter.scala   |  24 +
 .../connector/internal/oql/RowBuilder.scala     |  22 +
 .../connector/internal/oql/SchemaBuilder.scala  |  57 ++
 .../internal/oql/UndefinedSerializer.scala      |  30 +
 .../connector/internal/rdd/GemFireJoinRDD.scala |  51 ++
 .../internal/rdd/GemFireOuterJoinRDD.scala      |  53 ++
 .../internal/rdd/GemFireRDDPartition.scala      |  20 +
 .../internal/rdd/GemFireRDDPartitioner.scala    |  43 ++
 .../rdd/GemFireRDDPartitionerImpl.scala         |  73 +++
 .../internal/rdd/GemFireRDDWriter.scala         |  55 ++
 .../internal/rdd/GemFireRegionRDD.scala         | 122 ++++
 .../javaapi/GemFireJavaRegionRDD.scala          |  10 +
 .../spark/connector/javaapi/JavaAPIHelper.scala |  35 ++
 .../gemfire/spark/connector/package.scala       |  43 ++
 .../streaming/GemFireDStreamFunctions.scala     |  61 ++
 .../spark/connector/streaming/package.scala     |  16 +
 .../gemfire/spark/connector/JavaAPITest.java    | 147 +++++
 .../connector/GemFireFunctionDeployerTest.scala |  42 ++
 .../DefaultGemFireConnectionManagerTest.scala   |  66 +++
 ...tStreamingResultSenderAndCollectorTest.scala | 238 ++++++++
 .../internal/oql/QueryParserTest.scala          |  67 +++
 .../connector/ConnectorImplicitsTest.scala      |  34 ++
 .../connector/GemFireConnectionConfTest.scala   |  84 +++
 .../connector/GemFireDStreamFunctionsTest.scala |  63 ++
 .../connector/GemFireRDDFunctionsTest.scala     |  99 ++++
 .../spark/connector/LocatorHelperTest.scala     |  75 +++
 .../rdd/GemFireRDDPartitionerTest.scala         | 174 ++++++
 .../connector/rdd/GemFireRegionRDDTest.scala    | 101 ++++
 .../basic-demos/src/main/java/demo/Emp.java     |  79 +++
 .../src/main/java/demo/OQLJavaDemo.java         |  43 ++
 .../src/main/java/demo/PairRDDSaveJavaDemo.java |  70 +++
 .../src/main/java/demo/RDDSaveJavaDemo.java     |  69 +++
 .../src/main/java/demo/RegionToRDDJavaDemo.java |  41 ++
 .../src/main/scala/demo/NetworkWordCount.scala  |  59 ++
 .../project/Dependencies.scala                  |  29 +
 .../project/GemFireSparkBuild.scala             |  60 ++
 gemfire-spark-connector/project/Settings.scala  |  42 ++
 .../project/build.properties                    |   1 +
 gemfire-spark-connector/project/plugins.sbt     |   8 +
 gemfire-spark-connector/scalastyle-config.xml   | 117 ++++
 95 files changed, 7853 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/.gitignore
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/.gitignore b/gemfire-spark-connector/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/gemfire-spark-connector/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/README.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/README.md b/gemfire-spark-connector/README.md
new file mode 100644
index 0000000..125cef7
--- /dev/null
+++ b/gemfire-spark-connector/README.md
@@ -0,0 +1,45 @@
+#Spark GemFire Connector
+
+Note: GemFire is now an open source project [Geode](http://projectgeode.org).
+
+Spark GemFire Connector let's you connect Spark to GemFire, expose GemFire regions as Spark 
+RDDs, save Spark RDDs to GemFire and execute GemFire OQL queries in your Spark applications
+and expose results as DataFrames.
+
+##Features:
+ - Expose GemFire region as Spark RDD with GemFire server-side filtering
+ - RDD join and outer join GemFire region
+ - Save Spark RDD to GemFire
+ - Save DStream to GemFire
+ - Execute GemFire OQL and return DataFrame
+
+##Version and Compatibility
+| Connector | Spark | GemFire | Geode |
+|-----------|-------|---------|-------|
+| 0.5       | 1.3   | 9.0     |   ?   |   
+
+##Download
+TBD
+
+##Documentation
+ - [Building and testing](doc/1_building.md)
+ - [Quick start](doc/2_quick.md)
+ - [Connect to GemFire](doc/3_connecting.md)
+ - [Loading data from GemFire](doc/4_loading.md)
+ - [RDD Join and Outer Join GemFire Region](doc/5_rdd_join.md)
+ - [Saving RDD to GemFire](doc/6_save_rdd.md)
+ - [Saving DStream to GemFire](doc/7_save_dstream.md)
+ - [GemFire OQL](doc/8_oql.md)
+ - [Using Connector in Java](doc/9_java_api.md)
+ - [About the demos](doc/10_demos.md)
+ - [Logging](doc/logging.md)  ???
+ - [Security] (doc/security.md) ???
+
+
+##Community: Reporting bugs, mailing list, contributing
+
+ (TBD)
+ 
+##License: Apache License 2.0
+
+ (TBD) 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/10_demos.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/10_demos.md b/gemfire-spark-connector/doc/10_demos.md
new file mode 100644
index 0000000..ef4ef86
--- /dev/null
+++ b/gemfire-spark-connector/doc/10_demos.md
@@ -0,0 +1,84 @@
+## About The Demos
+The Spark GemFire Connector contains basic demos, as samples, in both Scala
+and Java.
+
+ - Read GemFire region to Spark as a RDD (`RegionToRDDJavaDemo.java`)
+ - Write Spark pair RDD to GemFire (`PairRDDSaveJavaDemo.java`)
+ - Write Spark non-pair RDD to GemFire (`RDDSaveJavaDemo.java`)
+ - Read OQL query result as Spark DataFrame (OQLJavaDemo.java)
+ - Network stateful word count (NetworkWordCount.scala)
+
+### Requirements
+Running the demo requires a GemFire Cluster. This can be a one 
+node or multi-node cluster.
+
+Here are the commands that start a two-node GemFire cluster on localhost:
+First set up environment variables:
+```
+export JAVA_HOME=<path to JAVA installation>
+export GEMFIRE=<path to GemFire installation>
+export CONNECTOR=<path to Connector project>
+export CLASSPATH=$CLASSPATH:$GEMFIRE/lib/locator-dependencies.jar:$GEMFIRE/lib/server-dependencies.jar:$GEMFIRE/lib/gfsh-dependencies.jar
+export PATH=$PATH:$GEMFIRE/bin
+export GF_JAVA=$JAVA_HOME/bin/java
+
+Now run gfsh and execute the commands:
+$ cd <path to test GemFire cluster instance location>
+$ mkdir locator server1 server2
+$ gfsh
+gfsh> start locator --name=locator
+gfsh> start server --name=server1 --server-port=40411
+gfsh> start server --name=server2 --server-port=40412 
+```
+
+In order to run the Demos, you need to create the following regions
+via `gfsh`:
+```
+gfsh> create region --name=str_str_region --type=REPLICATE --key-constraint=java.lang.String --value-constraint=java.lang.String
+gfsh> create region --name=str_int_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.Integer
+```
+
+And deploy GemFire functions required by the Spark GemFire Connector:
+```
+gfsh> deploy --jar=<path to connector project>/gemfire-functions/target/scala-2.10/gemfire-functions_2.10-0.5.0.jar
+```
+
+### Run simple demos
+This section describes how to run `RDDSaveJavaDemo.java`, 
+`PairRDDSaveJavaDemo.java` and `RegionToRDDJavaDemo.java`:
+```
+export SPARK_CLASSPATH=$CONNECTOR/gemfire-spark-connector/target/scala-2.10/gemfire-spark-connector_2.10-0.5.0.jar:$GEMFIRE/lib/server-dependencies.jar
+
+cd <spark 1.3 dir>
+bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port]
+
+bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port]
+
+bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port]
+```
+
+### Run stateful network word count
+This demo shows how to save DStream to GemFire. To run the demo, open 3 Terminals:
+
+**Terminal-1**, start net cat server:
+```
+$ nc -lk 9999
+```
+
+**Terminal-2**, start word count Spark app: 
+```
+bin/spark-submit --master=local[2] demo.NetworkWordCount $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port`
+```
+
+Switch to Terminal-1, type some words, and hit `enter` or `return` key, then check word count at **Terminal-3**, which has `gfsh` connected to the GemFire cluster:
+```
+gfsh> query --query="select key, value from /str_int_region.entrySet" 
+```
+
+### Shutdown GemFire cluster at the end
+Use following command to shutdown the GemFire cluster after playing with
+the demos:
+```
+gfsh> shutdown --include-locators=true
+```
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/1_building.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/1_building.md b/gemfire-spark-connector/doc/1_building.md
new file mode 100644
index 0000000..ece4a9c
--- /dev/null
+++ b/gemfire-spark-connector/doc/1_building.md
@@ -0,0 +1,29 @@
+## Building and Testing
+
+You will need Scala 2.10 and sbt 0.13.5 to 0.13.7.
+
+### Building Artifacts
+
+In the root directory of connector project, run:
+```
+sbt clean package
+```
+
+The following jar files will be created:
+ - `gemfire-spark-connector/target/scala-2.10/gemfire-spark-connector_2.10-0.5.0.jar`
+ - `gemfire-functions/target/scala-2.10/gemfire-functions_2.10-0.5.0.jar`
+ - `gemfire-spark-demos/target/scala-2.10/gemfire-spark-demos_2.10-0.5.0.jar `
+
+### Testing
+Commands to run unit and integration tests:
+```
+sbt test        // unit tests
+sbt it:test     // integration tests  
+```
+
+Integration tests start up a GemFire cluster and starts up Spark in local mode.
+Please make sure you've done following before you run `sbt it:test`:
+ - run`sbt package`
+ - set environment variable `GEMFIRE` to point to a GemFire installation.
+
+Next: [Quick Start](2_quick.md)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/2_quick.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/2_quick.md b/gemfire-spark-connector/doc/2_quick.md
new file mode 100644
index 0000000..b05302b
--- /dev/null
+++ b/gemfire-spark-connector/doc/2_quick.md
@@ -0,0 +1,178 @@
+## 5 Minutes Quick Start Guide
+
+In this quick start guide, you will learn how to use Spark shell to test Spark
+GemFire Connector functionalities.
+
+### Prerequisites
+
+Before you start, you should have basic knowledge of GemFire and Apache Spark. 
+Please refer to [GemFire Documentation](http://gemfire.docs.pivotal.io/latest/userguide/index.html)
+and [Spark Documentation](https://spark.apache.org/docs/latest/index.html) for
+the details. If you are new to GemFire, this 
+[tutorial](http://gemfire.docs.pivotal.io/latest/userguide/index.html#getting_started/gemfire_tutorial/chapter_overview.html)
+is a good starting point.
+
+You need 2 terminals to follow along, one for GemFire `gfsh`, and one for Spark shell. Set up Jdk 1.7 on both of them.
+
+### GemFire `gfsh` terminal
+In this terminal, start GemFire cluster, deploy Connector's gemfire-function jar, and create demo regions.
+
+Set up environment variables:
+```
+export JAVA_HOME=<path to JAVA installation>
+export GEMFIRE=<path to GemFire installation>
+export CONNECTOR=<path to Spark GemFire Connector project (parent dir of this file)>
+export CLASSPATH=$CLASSPATH:$GEMFIRE/lib/locator-dependencies.jar:$GEMFIRE/lib/server-dependencies.jar:$GEMFIRE/lib/gfsh-dependencies.jar
+export PATH=$PATH:$GEMFIRE/bin
+export GF_JAVA=$JAVA_HOME/bin/java
+```
+
+Start GemFire cluster with 1 locator and 2 servers:
+```
+gfsh
+gfsh>start locator --name=locator1 --port=55221
+gfsh>start server --name=server1 --locators=localhost[55221] --server-port=0
+gfsh>start server --name=server2 --locators=localhost[55221] --server-port=0
+```
+
+Then create two demo regions:
+```
+gfsh>create region --name=str_str_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.String
+gfsh>create region --name=int_str_region --type=PARTITION --key-constraint=java.lang.Integer --value-constraint=java.lang.String
+```
+
+Deploy Connector's gemfire-function jar (`gemfire-functions_2.10-0.5.0.jar`):
+```
+gfsh>deploy --jar=<path to connector project>/gemfire-functions/target/scala-2.10/gemfire-functions_2.10-0.5.0.jar
+```
+
+### Spark shell terminal
+In this terminal, setup Spark environment, and start Spark shell.
+
+Set GemFire locator property in Spark configuration: add 
+following to `<spark-dir>/conf/spark-defaults.conf`:
+```
+spark.gemfire.locators=localhost[55221]
+```
+Note:
+ - if the file doesn't exist, create one. 
+ - replace string `localhost[55221]` with your own locator host and port.
+
+By default, Spark shell output lots of info log, if you want to
+turn off info log, change `log4j.rootCategory` to `WARN, console`
+in file `<spark dir>/conf/conf/log4j.properties`:
+```
+log4j.rootCategory=WARN, console
+```
+if file `log4j.properties` doesn't exist, copy `log4j.properties.template`
+under the same directory to `log4j.properties` and update the file.
+
+Start spark-shell:
+```
+bin/spark-shell --master local[*] --jars $CONNECTOR/gemfire-spark-connector/target/scala-2.10/gemfire-spark-connector_2.10-0.5.0.jar,$GEMFIRE/lib/server-dependencies.jar
+```
+
+Check GemFire locator property in the Spark shell:
+```
+scala> sc.getConf.get("spark.gemfire.locators")
+res0: String = localhost[55221]
+```
+
+In order to enable GemFire specific functions, you need to import 
+`io.pivotal.gemfire.spark.connector._`
+```
+scala> import io.pivotal.gemfire.spark.connector._
+import io.pivotal.gemfire.spark.connector._
+```
+
+### Save Pair RDD to GemFire
+In the Spark shell, create a simple pair RDD and save it to GemFire:
+```
+scala> val data = Array(("1", "one"), ("2", "two"), ("3", "three"))
+data: Array[(String, String)] = Array((1,one), (2,two), (3,three))
+
+scala> val distData = sc.parallelize(data)
+distData: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:14
+
+scala> distData.saveToGemfire("str_str_region")
+15/02/17 07:11:54 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:29, took 0.341288 s
+```
+
+Verify the data is saved in GemFile using `gfsh`:
+```
+gfsh>query --query="select key,value from /str_str_region.entries"
+
+Result     : true
+startCount : 0
+endCount   : 20
+Rows       : 3
+
+key | value
+--- | -----
+1   | one
+3   | three
+2   | two
+
+NEXT_STEP_NAME : END
+```
+
+### Save Non-Pair RDD to GemFire 
+Saving non-pair RDD to GemFire requires an extra function that converts each 
+element of RDD to a key-value pair. Here's sample session in Spark shell:
+```
+scala> val data2 = Array("a","ab","abc")
+data2: Array[String] = Array(a, ab, abc)
+
+scala> val distData2 = sc.parallelize(data2)
+distData2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:17
+
+scala> distData2.saveToGemfire("int_str_region", e => (e.length, e))
+[info 2015/02/17 12:43:21.174 PST <main> tid=0x1]
+...
+15/02/17 12:43:21 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:52, took 0.251194 s
+```
+
+Verify the result with `gfsh`:
+```
+gfsh>query --query="select key,value from /int_str_region.entrySet"
+
+Result     : true
+startCount : 0
+endCount   : 20
+Rows       : 3
+
+key | value
+--- | -----
+2   | ab
+3   | abc
+1   | a
+
+NEXT_STEP_NAME : END
+```
+
+### Expose GemFire Region As RDD
+The same API is used to expose both replicated and partitioned region as RDDs. 
+```
+scala> val rdd = sc.gemfireRegion[String, String]("str_str_region")
+rdd: io.pivotal.gemfire.spark.connector.rdd.GemFireRDD[String,String] = GemFireRDD[2] at RDD at GemFireRDD.scala:19
+
+scala> rdd.foreach(println)
+(1,one)
+(3,three)
+(2,two)
+
+scala> val rdd2 = sc.gemfireRegion[Int, String]("int_str_region")
+rdd2: io.pivotal.gemfire.spark.connector.rdd.GemFireRDD[Int,String] = GemFireRDD[3] at RDD at GemFireRDD.scala:19
+
+scala> rdd2.foreach(println)
+(2,ab)
+(1,a)
+(3,abc)
+```
+Note: use the right type of region key and value, otherwise you'll get
+ClassCastException. 
+
+
+Next: [Connecting to GemFire](3_connecting.md)
+
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/3_connecting.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/3_connecting.md b/gemfire-spark-connector/doc/3_connecting.md
new file mode 100644
index 0000000..bac9785
--- /dev/null
+++ b/gemfire-spark-connector/doc/3_connecting.md
@@ -0,0 +1,55 @@
+## Connecting to GemFire
+
+There are two ways to connect Spark to Gemfire:
+ - Specify GemFire connection properties via `SparkConf`.
+ - Specify GemFire connection properties via `GemFireConnectionConf`.
+
+### Specify GemFire connection properties via `SparkConf`
+The only required GemFire connection property is `spark.gemfire.locators`. 
+This can be specified in `<spark dir>/conf/spark-defaults.conf` or in Spark 
+application code. In the following examples, we assume you want to provide
+3 extra properties: `security-client-auth-init`, `security-username`, and 
+`security-password`, note that they are prefixed with `spark.gemfire.`.
+ 
+In `<spark dir>/conf/spark-defaults.com`
+```
+spark.gemfire.locators=192.168.1.47[10334]
+spark.gemfire.security-client-auth-init=templates.security.UserPasswordAuthInit.create
+spark.gemfire.security-username=scott
+spark.gemfire.security-password=tiger
+```
+ 
+Or in the Spark application code:
+```
+import io.pivotal.gemfire.spark.connector._
+val sparkConf = new SparkConf()
+  .set(GemFireLocatorPropKey, "192.168.1.47[10334]")
+  .set("spark.gemfire.security-client-auth-init", "templates.security.UserPasswordAuthInit.create")
+  .set("spark.gemfire.security-username", "scott")
+  .set("spark.gemfire.security-password", "tiger")
+```
+
+After this, you can use all connector APIs without providing `GemfireConnectionConf`.
+ 
+### Specify GemFire connection properties via `GemFireConnectionConf`
+Here's the code that creates `GemFireConnectionConf` with the same set of 
+properties as the examples above:
+```
+val props = Map("security-client-auth-init" -> "templates.security.UserPasswordAuthInit.create",
+                "security-username" -> "scott",
+                "security-password" -> "tiger")
+val connConf = GemFireConnectionConf("192.168.1.47[10334]", props)
+``` 
+
+Please note that those properties are **not** prefixed with `spark.gemfire.`.
+
+After this, you can use all connector APIs that require `GemFireConnectionConf`.
+
+### Notes about locators
+ - You can specify locator in two formats: `host[port]` or `host:port`. For
+   example `192.168.1.47[10334]` or `192.168.1.47:10334`
+ - If your GemFire cluster has multiple locators, list them all and separated
+   by `,`. For example: `host1:10334,host2:10334`.
+
+
+Next: [Loading Data from GemFire](4_loading.md)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/4_loading.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/4_loading.md b/gemfire-spark-connector/doc/4_loading.md
new file mode 100644
index 0000000..fb03660
--- /dev/null
+++ b/gemfire-spark-connector/doc/4_loading.md
@@ -0,0 +1,101 @@
+## Loading Data from GemFire
+
+To expose full data set of a GemFire region as a Spark
+RDD, call `gemfireRegion` method on the SparkContext object.
+
+```
+val rdd = sc.gemfireRegion("region path")
+```
+
+Or with specific `GemfireConectionConf` object instance (see 
+[Connecting to  Gemfire](3_connecting.md) for how to create GemfireConectionConf):
+```
+val rdd = sc.gemfireRegion("region path", connConf)
+```
+
+## GemFire RDD Partitions
+
+GemFire has two region types: **replicated**, and
+**partitioned** region. Replicated region has full dataset on
+each server, while partitioned region has its dataset spanning
+upon multiple servers, and may have duplicates for high 
+availability.
+
+Since replicated region has its full dataset available on every
+server, there is only one RDD partition for a `GemFireRegionRDD` that 
+represents a replicated region.
+
+For a `GemFireRegionRDD` that represents a partitioned region, there are 
+many potential  ways to create RDD partitions. So far, we have 
+implemented ServerSplitsPartitioner, which will split the bucket set
+on each GemFire server into two RDD partitions by default.
+The number of splits is configurable, the following shows how to set 
+three partitions per GemFire server:
+```
+import io.pivotal.gemfire.spark.connector._
+
+val opConf = Map(PreferredPartitionerPropKey -> ServerSplitsPartitionerName,
+                 NumberPartitionsPerServerPropKey -> "3")
+
+val rdd1 = sc.gemfireRegion[String, Int]("str_int_region", opConf = opConf)
+// or
+val rdd2 = sc.gemfireRegion[String, Int]("str_int_region", connConf, opConf)  
+```
+
+
+## GemFire Server-Side Filtering
+Server-side filtering allow exposing partial dataset of a GemFire region
+as a RDD, this reduces the amount of data transferred from GemFire to 
+Spark to speed up processing.
+```
+val rdd = sc.gemfireRegion("<region path>").where("<where clause>")
+```
+
+The above call is translated to OQL query `select key, value from /<region path>.entries where <where clause>`, then the query is executed for each RDD partition. Note: the RDD partitions are created the same way as described in the section above.
+
+In the following demo, javabean class `Emp` is used, it has 5 attributes: `id`, `lname`, `fname`, `age`, and `loc`. In order to make `Emp` class available on GemFire servers, we need to deploy a jar file that contains `Emp` class, now build the `emp.jar`,  deploy it and create region `emps` in `gfsh`:
+```
+zip $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar \
+  -i "demo/Emp.class" --out $CONNECTOR/emp.jar
+  
+gfsh
+gfsh> deploy --jar=<path to connector project>/emp.jar
+gfsh> create region --name=emps --type=PARTITION 
+```
+Note: The `Emp.class` is availble in `basic-demos_2.10-0.5.0.jar`. But that jar file depends on many scala and spark classes that are not available on GemFire servers' classpath. So use the above `zip` command to create a jar file that only contains `Emp.class`.  
+
+Now in Spark shell, generate some random `Emp` records, and save them to region `emps` (remember to add `emp.jar` to Spark shell classpath before starting Spark shell):
+```
+import io.pivotal.gemfire.spark.connector._
+import scala.util.Random
+import demo.Emp
+
+val lnames = List("Smith", "Johnson", "Jones", "Miller", "Wilson", "Taylor", "Thomas", "Lee", "Green", "Parker", "Powell")
+val fnames = List("John", "James", "Robert", "Paul", "George", "Kevin", "Jason", "Jerry", "Peter", "Joe", "Alice", "Sophia", "Emma", "Emily")
+val locs = List("CA", "WA", "OR", "NY", "FL")
+def rpick(xs: List[String]): String = xs(Random.nextInt(xs.size))
+
+val d1 = (1 to 20).map(x => new Emp(x, rpick(lnames), rpick(fnames), 20+Random.nextInt(41), rpick(locs))).toArray
+val rdd1 = sc.parallelize(d1) 
+rdd1.saveToGemfire("emps", e => (e.getId, e))
+```
+
+Now create a RDD that contains all employees whose age is less than 40, and display its contents:
+```
+val rdd1s = sc.gemfireRegion("emps").where("value.getAge() < 40")
+
+rdd1s.foreach(println)
+(5,Emp(5, Taylor, Robert, 32, FL))
+(14,Emp(14, Smith, Jason, 28, FL))
+(7,Emp(7, Jones, Robert, 26, WA))
+(17,Emp(17, Parker, John, 20, WA))
+(2,Emp(2, Thomas, Emily, 22, WA))
+(10,Emp(10, Lee, Alice, 31, OR))
+(4,Emp(4, Wilson, James, 37, CA))
+(15,Emp(15, Powell, Jason, 34, NY))
+(3,Emp(3, Lee, Sophia, 32, OR))
+(9,Emp(9, Johnson, Sophia, 25, OR))
+(6,Emp(6, Miller, Jerry, 30, NY))
+```
+
+Next: [RDD Join and Outer Join GemFire Region](5_rdd_join.md)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/5_rdd_join.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/5_rdd_join.md b/gemfire-spark-connector/doc/5_rdd_join.md
new file mode 100644
index 0000000..edc86e8
--- /dev/null
+++ b/gemfire-spark-connector/doc/5_rdd_join.md
@@ -0,0 +1,237 @@
+## RDD Join and Outer Join GemFire Region
+
+The Spark GemFire Connector suports using any RDD as a source
+of a join and outer join with a GemFire region through APIs
+`joinGemfireRegion[K, V]` and `outerJoinGemfireRegion[K, V]`. 
+Those two APIs execute a single `region.getAll` call for every 
+partition of the source RDD, so no unnecessary data will be requested
+or transferred. This means a join or outer join between any RDD and
+a GemFire region can be performed without full region scan, and the
+source RDD's partitioning and placement for data locality are used.
+
+Please note that the two type parameters `[K, V]` are the type
+of key/value pair of region entries, they need to be specified
+to make result RDD has correct type.
+
+The region `emps` that is created and populated in 
+[GemFire Server-Side Filtering](4_loading.md) will be used in the
+following examples.
+
+### RDD[(K, V1)] join and outer join Region[K, V2]
+
+In this case, the source RDD is a pair RDD,  and it has the same key
+type as the Region. Use API `rdd.joinGemfireRegion[K, V2](regionPath)` and 
+`rdd.outerJoinGemfireRegion[K, V2](regionPath)` do the join and outer
+join. 
+
+Prepare a source RDD `rdd2`:
+```
+val d2 = (11 to 25).map(x => (x, s"message-$x")).toArray
+val rdd2 = sc.parallelize(d2)
+// print rdd2's content
+rdd2.foreach(println)
+(11,message-11)
+(12,message-12)
+(13,message-13)
+(14,message-14)
+(15,message-15)
+(16,message-16)
+(17,message-17)
+(18,message-18)
+(19,message-19)
+(20,message-20)
+(21,message-21)
+(22,message-22)
+(23,message-23)
+(24,message-24)
+(25,message-25)
+```
+
+Join RDD `rdd2` with region `emps`, and print out the result:
+```
+val rdd2j = rdd2.joinGemfireRegion[Int, Emp]("emps")
+
+rdd2j.foreach(println)
+((11,message-11),Emp(11, Taylor, Emma, 44, CA))
+((12,message-12),Emp(12, Taylor, Joe, 60, FL))
+((13,message-13),Emp(13, Lee, Kevin, 50, FL))
+((14,message-14),Emp(14, Smith, Jason, 28, FL))
+((15,message-15),Emp(15, Powell, Jason, 34, NY))
+((16,message-16),Emp(16, Thomas, Alice, 42, OR))
+((17,message-17),Emp(17, Parker, John, 20, WA))
+((18,message-18),Emp(18, Powell, Alice, 58, FL))
+((19,message-19),Emp(19, Taylor, Peter, 46, FL))
+((20,message-20),Emp(20, Green, Peter, 57, CA))
+```
+Note that there's no pairs in the result RDD `rdd2j` corresponding to
+the pairs with id from 21 to 25 in RDD `rdd2` since there's no region
+entries have those key values.
+
+Outer join RDD `rdd2` with region `emps`, and print out the result:
+```
+val rdd2o = rdd2.outerJoinGemfireRegion[Int, Emp]("emps")
+
+rdd2o.foreach(println)
+((18,message-18),Some(Emp(18, Powell, Alice, 58, FL)))
+((19,message-19),Some(Emp(19, Taylor, Peter, 46, FL)))
+((11,message-11),Some(Emp(11, Taylor, Emma, 44, CA)))
+((12,message-12),Some(Emp(12, Taylor, Joe, 60, FL)))
+((20,message-20),Some(Emp(20, Green, Peter, 57, CA)))
+((21,message-21),None)
+((22,message-22),None)
+((23,message-23),None)
+((24,message-24),None)
+((25,message-25),None)
+((13,message-13),Some(Emp(13, Lee, Kevin, 50, FL)))
+((14,message-14),Some(Emp(14, Smith, Jason, 28, FL)))
+((15,message-15),Some(Emp(15, Powell, Jason, 34, NY)))
+((16,message-16),Some(Emp(16, Thomas, Alice, 42, OR)))
+((17,message-17),Some(Emp(17, Parker, John, 20, WA)))
+```
+Note that there are pairs in the result RDD `rdd2o` corresponding to
+the pairs with id from 21 to 25 in the RDD `rdd2`, and values are `None`
+since there's no region entries have those key values.
+
+### RDD[(K1, V1)] join and outer join Region[K2, V2]
+
+In this case, the source RDD is still a pair RDD,  but it has different
+key type. Use API `rdd.joinGemfireRegion[K2, V2](regionPath, func)` and 
+`rdd.outerJoinGemfireRegion[K2, V2](regionPath, func)` do the join and 
+outer join, where `func` is the function to generate key from (k, v)
+pair, the element of source RDD, to join with GemFire region.
+
+Prepare a source RDD `d3`:
+```
+val d3 = (11 to 25).map(x => (s"message-$x", x)).toArray
+val rdd3 = sc.parallelize(d3)
+// print rdd3's content
+rdd3.foreach(println)
+(message-18,18)
+(message-19,19)
+(message-11,11)
+(message-20,20)
+(message-21,21)
+(message-22,22)
+(message-12,12)
+(message-23,23)
+(message-24,24)
+(message-25,25)
+(message-13,13)
+(message-14,14)
+(message-15,15)
+(message-16,16)
+(message-17,17)
+```
+
+Join RDD `rdd3` (RDD[(String, Int)] with region `emps` (Region[Int, Emp]), and print out the result:
+```
+val rdd3j = rdd3.joinGemfireRegion[Int, Emp]("emps", pair => pair._2)
+
+rdd3j.foreach(println)
+((message-18,18),Emp(18, Powell, Alice, 58, FL))
+((message-19,19),Emp(19, Taylor, Peter, 46, FL))
+((message-20,20),Emp(20, Green, Peter, 57, CA))
+((message-11,11),Emp(11, Taylor, Emma, 44, CA))
+((message-12,12),Emp(12, Taylor, Joe, 60, FL))
+((message-13,13),Emp(13, Lee, Kevin, 50, FL))
+((message-14,14),Emp(14, Smith, Jason, 28, FL))
+((message-15,15),Emp(15, Powell, Jason, 34, NY))
+((message-16,16),Emp(16, Thomas, Alice, 42, OR))
+((message-17,17),Emp(17, Parker, John, 20, WA))
+```
+Note `pair => pair._2` means use the 2nd element of the element of source
+RDD and join key.
+
+Outer join RDD `rdd3` with region `emps`, and print out the result:
+```
+val rdd3o = rdd3.outerJoinGemfireRegion[Int, Emp]("emps", pair => pair._2)
+
+rdd3o.foreach(println)
+((message-18,18),Some(Emp(18, Powell, Alice, 58, FL)))
+((message-11,11),Some(Emp(11, Taylor, Emma, 44, CA)))
+((message-19,19),Some(Emp(19, Taylor, Peter, 46, FL)))
+((message-12,12),Some(Emp(12, Taylor, Joe, 60, FL)))
+((message-20,20),Some(Emp(20, Green, Peter, 57, CA)))
+((message-13,13),Some(Emp(13, Lee, Kevin, 50, FL)))
+((message-21,21),None)
+((message-14,14),Some(Emp(14, Smith, Jason, 28, FL)))
+((message-22,22),None)
+((message-23,23),None)
+((message-24,24),None)
+((message-25,25),None)
+((message-15,15),Some(Emp(15, Powell, Jason, 34, NY)))
+((message-16,16),Some(Emp(16, Thomas, Alice, 42, OR)))
+((message-17,17),Some(Emp(17, Parker, John, 20, WA)))
+```
+
+### RDD[T] join and outer join Region[K, V]
+
+Use API `rdd.joinGemfireRegion[K, V](regionPath, func)` and 
+`rdd.outerJoinGemfireRegion[K, V](regionPath, func)` do the join
+and outer join, where `func` is the function to generate key from
+`t`, the element of source RDD, to join with GemFire region.
+
+Prepare a source RDD `d4`:
+```
+val d4 = (11 to 25).map(x => x * 2).toArray
+val rdd4 = sc.parallelize(d4)
+// print rdd4's content
+rdd4.foreach(println)
+22
+24
+36
+38
+40
+42
+44
+46
+26
+28
+48
+30
+32
+34
+50
+```
+
+Join RDD `d4` with region `emps`, and print out the result:
+```
+val rdd4j = rdd4.joinGemfireRegion[Int, Emp]("emps", x => x/2)
+
+rdd4j.foreach(println)
+(22,Emp(11, Taylor, Emma, 44, CA))
+(24,Emp(12, Taylor, Joe, 60, FL))
+(26,Emp(13, Lee, Kevin, 50, FL))
+(28,Emp(14, Smith, Jason, 28, FL))
+(30,Emp(15, Powell, Jason, 34, NY))
+(32,Emp(16, Thomas, Alice, 42, OR))
+(34,Emp(17, Parker, John, 20, WA))
+(36,Emp(18, Powell, Alice, 58, FL))
+(38,Emp(19, Taylor, Peter, 46, FL))
+(40,Emp(20, Green, Peter, 57, CA))
+```
+
+Outer join RDD `d4` with region `emps`, and print out the result:
+```
+val rdd4o = rdd4.outerJoinGemfireRegion[Int, Emp]("emps", x => x/2)
+
+rdd4o.foreach(println)
+(36,Some(Emp(18, Powell, Alice, 58, FL)))
+(38,Some(Emp(19, Taylor, Peter, 46, FL)))
+(40,Some(Emp(20, Green, Peter, 57, CA)))
+(42,None)
+(44,None)
+(46,None)
+(48,None)
+(50,None)
+(22,Some(Emp(11, Taylor, Emma, 44, CA)))
+(24,Some(Emp(12, Taylor, Joe, 60, FL)))
+(26,Some(Emp(13, Lee, Kevin, 50, FL)))
+(28,Some(Emp(14, Smith, Jason, 28, FL)))
+(30,Some(Emp(15, Powell, Jason, 34, NY)))
+(32,Some(Emp(16, Thomas, Alice, 42, OR)))
+(34,Some(Emp(17, Parker, John, 20, WA)))
+```
+
+
+Next: [Saving RDD to GemFire](6_save_join.md)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/6_save_rdd.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/6_save_rdd.md b/gemfire-spark-connector/doc/6_save_rdd.md
new file mode 100644
index 0000000..8516955
--- /dev/null
+++ b/gemfire-spark-connector/doc/6_save_rdd.md
@@ -0,0 +1,61 @@
+## Saving RDD to GemFire
+
+It is possible to save any RDD to a GemFire region. The requirements are:
+ - the object class of the elements contained by the RDD is 
+   (1) available on the classpath of GemFire servers 
+   (2) and serializable.
+ - the target region exists.
+
+To save an RDD to an existing GemFire region, import 
+`io.pivotal.gemfire.spark.connector._` and call the `saveToGemfire` 
+method on RDD.
+
+### Save RDD[(K, V)] to GemFire
+For pair RDD, i.e., RDD[(K, V)], the pair is treated as key/value pair.
+```
+val data = Array(("1","one"),("2","two"),("3","three"))
+val rdd = sc.parallelize(data)
+rdd.saveToGemfire("str_str_region")
+```
+
+If you create GemFireConnectionConf as described in 
+[Connecting to Gemfire](3_connecting.md), the last statement becomes:
+```
+rdd.saveToGemFire("str_str_region", connConf)
+```
+
+You can verify the region contents:
+```
+gfsh>query --query="select key, value from /str_str_region.entrySet"
+
+Result     : true
+startCount : 0
+endCount   : 20
+Rows       : 3
+
+key | value
+--- | -----
+1   | one
+3   | three
+2   | two
+```
+
+Note that GemFire regions require unique keys, so if the pair RDD 
+contains duplicated keys, those pairs with the same key are overwriting
+each other, and only one of them appears in the final dataset.
+
+### Save RDD[T] to GemFire
+To save non-pair RDD to GemFire, a function (`f: T => K`) that creates keys
+from elements of RDD, and is used to convert RDD element `T` to pair `(f(T), T)`,
+then the pair is save to GemFire.
+
+```
+val data2 = Array("a","ab","abc")
+val rdd2 = sc.parallelize(data2)
+rdd2.saveToGemfire("str_int_region", e => (e, e.length))
+// or use GemFireConnectionConf object directly
+// rdd2.saveToGemfire("rgnb", e => (e, e.length), connConf)
+```
+
+ 
+Next: [Saving DStream to GemFire](7_save_dstream.md)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/7_save_dstream.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/7_save_dstream.md b/gemfire-spark-connector/doc/7_save_dstream.md
new file mode 100644
index 0000000..17903fa
--- /dev/null
+++ b/gemfire-spark-connector/doc/7_save_dstream.md
@@ -0,0 +1,68 @@
+## Saving DStream to GemFire
+Spark Streaming extends the core API to allow high-throughput, fault-tolerant
+stream processing of live data streams.  Data can be ingested from many 
+sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. 
+Results can be stored in GemFire.
+
+### A Simple Spark Streaming App: Stateful Network Word Count
+
+Create a `StreamingContext` with a `SparkConf` configuration
+```
+val ssc = new StreamingContext(sparkConf, Seconds(1))
+```
+
+Create a DStream that will connect to net cat server `host:port`
+```
+val lines = ssc.socketTextStream(host, port)
+```
+
+Count each word in each batch
+```
+val words = lines.flatMap(_.split(" "))
+val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
+```
+
+Use `updateStateByKey` to maintain a running count of each word seen in a text
+data stream. Here, the running count is the state and it is an integer. We 
+define the update function as
+```
+val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+  val currentCount = values.foldLeft(0)(_ + _)
+  val previousCount = state.getOrElse(0)
+  Some(currentCount + previousCount)
+}
+```
+
+This is applied on a DStream containing words (say, the pairs DStream containing
+`(word, 3)` pairs in the earlier example
+```
+val runningCounts = wordCounts.updateStateByKey[Int](updateFunction _)
+```
+
+Print a few of the counts to the console. Start the computation.
+```
+runningCounts.print()
+ssc.start()
+ssc.awaitTermination() // Wait for the computation to terminate
+```
+
+#### Spark Streaming With GemFire
+Now let's save the running word count to GemFire region `str_int_region`, which 
+simply replace print() with saveToGemfire():
+
+```
+import io.pivotal.gemfire.spark.connector.streaming._
+runningCounts.saveToGemfire("str_int_region")
+```
+
+You can use the version of saveToGemfire that has the parameter `GemFireConnectionConf`:
+```
+runningCounts.saveToGemfire("str_int_region", connConf)
+```
+
+See [Spark Streaming Programming Guide]
+(http://spark.apache.org/docs/latest/streaming-programming-guide.html) for 
+more details about Sarpk streaming programming.
+
+
+Next: [GemFire OQL](8_oql.md)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/8_oql.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/8_oql.md b/gemfire-spark-connector/doc/8_oql.md
new file mode 100644
index 0000000..bad1f3c
--- /dev/null
+++ b/gemfire-spark-connector/doc/8_oql.md
@@ -0,0 +1,60 @@
+## GemFire OQL Query
+Spark GemFire Connector lets us run GemFire OQL queries in Spark applications
+to retrieve data from GemFire. The query result is a Spark DataFrame. Note 
+that as of Spark 1.3, SchemaRDD is deprecated. Spark GemFire Connector does
+not support SchemaRDD.
+
+An instance of `SQLContext` is required to run OQL query.
+```
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+```
+
+Create a `DataFrame` using OQL:
+```
+val dataFrame = sqlContext.gemfireOQL("SELECT * FROM /CustomerRegion WHERE status = 'active'")
+```
+
+You can repartition the `DataFrame` using `DataFrame.repartition()` if needed. 
+Once you have the `DataFrame`, you can register it as a table and use Spark 
+SQL to query it:
+```
+dataFrame.registerTempTable("customer")
+val SQLResult = sqlContext.sql("SELECT * FROM customer WHERE id > 100")
+```
+
+##Serialization
+If the OQL query involves User Defined Type (UDT), and the default Java 
+serializer is used, then the UDT on GemFire must implement `java.io.Serializable`.
+
+If KryoSerializer is preferred, as described in [Spark Documentation]
+(https://spark.apache.org/docs/latest/tuning.html), you can configure 
+`SparkConf` as the following example:
+```
+val conf = new SparkConf()
+  .setAppName("MySparkApp")
+  .setMaster("local[*]")
+  .set(GemFireLocatorPropKey, "localhost[55221]")
+  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+  .set("spark.kryo.registrator", "io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator")
+```
+
+and register the classes (optional)
+```
+conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
+```
+
+Use the following options to start Spark shell:
+```
+ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
+ --conf spark.kryo.registrator=io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator
+```
+
+## References
+[GemFire OQL Documentation](http://gemfire.docs.pivotal.io/latest/userguide/index.html#developing/querying_basics/chapter_overview.html)
+
+[GemFire OQL Examples](http://gemfire.docs.pivotal.io/latest/userguide/index.html#getting_started/quickstart_examples/querying.html)
+
+[Spark SQL Documentation](https://spark.apache.org/docs/latest/sql-programming-guide.html)
+
+
+Next: [Using Connector in Java](9_java_api.md)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/9_java_api.md
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/doc/9_java_api.md b/gemfire-spark-connector/doc/9_java_api.md
new file mode 100644
index 0000000..f13fb9b
--- /dev/null
+++ b/gemfire-spark-connector/doc/9_java_api.md
@@ -0,0 +1,129 @@
+## Using Connector in Java
+This section describes how to access the functionality of Spark GemFire 
+Connector when you write your Spark applications in Java. It is assumed
+that you already familiarized yourself with the previous sections and 
+understand how the Spark GemFire Connector works.
+
+### Prerequisites
+The best way to use the Spark GemFire Connector Java API is to statically
+import all of the methods in `GemFireJavaUtil`. This utility class is
+the main entry point for Spark GemFire Connector Java API.
+```
+import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
+```
+
+Create JavaSparkContext (don't forget about the static import):
+```
+SparkConf conf = new SparkConf();
+conf.set(GemFireLocatorPropKey, "192.168.1.47[10334]")
+JavaSparkContext jsc = new JavaSparkContext(conf);
+```
+
+### Accessing GemFire region in Java
+GemFire region is exposed as `GemFireJavaRegionRDD<K,V>`(subclass of
+`JavaPairRDD<K, V>`):
+```
+GemFireJavaRegionRDD<Int, Emp> rdd1 = javaFunctions(jsc).gemfireRegion("emps")
+GemFireJavaRegionRDD<Int, Emp> rdd2 = rdd1.where("value.getAge() < 40");
+```
+
+### RDD Join and Outer Join
+Use the `rdd3` and region `emps` from [join and outer join examples](5_rdd_join.md):
+```
+static class MyKeyFunction implements Function<Tuple2<String, Integer>, Integer> {
+  @Override public Interger call(Tuple2<String, Integer> pair) throws Exception {
+    return pair._2();
+  }
+}
+
+MyKeyFunction func = new MyKeyFunction();
+
+JavaPairRDD<Tuple2<String, Integer>, Emp> rdd3j =
+  javaFunction(rdd3).joinGemfireRegion("emps", func);
+
+JavaPairRDD<Tuple2<String, Integer>, Option<Emp>> rdd3o = 
+  javaFunction(rdd3).outerJoinGemfireRegion("emps", func);
+
+```
+
+### Saving JavaPairRDD to GemFire
+Saving JavaPairRDD is straightforward:
+```
+List<Tuple2<String, String>> data = new ArrayList<>();
+data.add(new Tuple2<>("7", "seven"));
+data.add(new Tuple2<>("8", "eight"));
+data.add(new Tuple2<>("9", "nine"));
+
+// create JavaPairRDD
+JavaPairRDD<String, String> rdd1 = jsc.parallelizePairs(data);
+// save to GemFire
+javaFunctions(rdd1).saveToGemfire("str_str_region");
+```
+
+In order to save `JavaRDD<Tuple2<K,V>>`, it needs to be converted to 
+`JavaPairRDD<K,V>` via static method `toJavaPairRDD` from `GemFireJavaUtil`:
+```
+List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
+data2.add(new Tuple2<>("11", "eleven"));
+data2.add(new Tuple2<>("12", "twelve"));
+data2.add(new Tuple2<>("13", "thirteen"));
+
+// create JavaRDD<Tuple2<K,V>>
+JavaRDD<Tuple2<String, String>> rdd2 =  jsc.parallelize(data2);
+// save to GemFire
+javaFunctions(toJavaPairRDD(rdd2)).saveToGemfire("str_str_region");
+``` 
+
+### Saving JavaRDD to GemFire
+Similar to Scala version, a function is required to generate key/value pair
+from RDD element. The following `PairFunction` generate a `<String, Integer>`
+pair from `<String>`:
+```
+PairFunction<String, String, Integer> pairFunc =  
+  new PairFunction<String, String, Integer>() {
+    @Override public Tuple2<String, Integer> call(String s) throws Exception {
+      return new Tuple2<String, Integer>(s, s.length());
+    }
+  };
+```
+Note: there are 3 type parameters for PairFunction, they are: 
+ 1. type of JavaRDD element
+ 2. type of key of output key/value pair
+ 3. type of value of output key/value pair
+
+Once `PairFunction` is ready, the rest is easy:
+```
+// create demo JavaRDD<String>
+List<String> data = new ArrayList<String>();
+data.add("a");
+data.add("ab");
+data.add("abc");
+JavaRDD<String> jrdd =  sc.parallelize(data);
+    
+javaFunctions(rdd).saveToGemfire("str_int_region", pairFunc);
+```
+
+### Saving JavaPairDStream and JavaDStream
+Saving JavaPairDStream and JavaDStream is similar to saving JavaPairRDD 
+jand JavaRDD:
+```
+JavaPairDStream<String, String> ds1 = ...
+javaFunctions(ds1).saveToGemFire("str_str_region");
+
+JavaDStream<String> ds2 = ...
+javaFunctions(ds2).saveToGemFire("str_int_region", pairFunc);
+```
+
+### Using GemFire OQL
+
+There are two gemfireOQL Java APIs, with and without GemFireConnectionConf.
+Here is an example without GemFireConnectionConf, it will use default 
+GemFireConnectionConf internally.
+```
+// assume there's jsc: JavaSparkContext
+SQLContext sqlContext = new org.apache.spark.sql.SQLContext(jsc);
+DataFrame df = javaFunctions(sqlContext).gemfireOQL("select * from /str_str_region");
+df.show();
+```
+
+Next: [About The Demos] (10_demos.md)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java
new file mode 100644
index 0000000..55fe7e5
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java
@@ -0,0 +1,77 @@
+package io.pivotal.gemfire.spark.connector.internal;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.io.Serializable;
+
+/**
+ * This class contains all info required by GemFire RDD partitioner to create partitions.
+ */
+public class RegionMetadata implements Serializable {
+
+  private String  regionPath;
+  private boolean isPartitioned;
+  private int     totalBuckets;
+  private HashMap<ServerLocation, HashSet<Integer>> serverBucketMap;
+  private String  keyTypeName;
+  private String  valueTypeName;
+
+  /**
+   * Default constructor.
+   * @param regionPath the full path of the given region
+   * @param isPartitioned true for partitioned region, false otherwise
+   * @param totalBuckets number of total buckets for partitioned region, ignored otherwise
+   * @param serverBucketMap gemfire server (host:port pair) to bucket set map
+   * @param keyTypeName region key class name
+   * @param valueTypeName region value class name                    
+   */
+  public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap,
+                        String keyTypeName, String valueTypeName) {
+    this.regionPath = regionPath;
+    this.isPartitioned = isPartitioned;
+    this.totalBuckets = totalBuckets;
+    this.serverBucketMap = serverBucketMap;
+    this.keyTypeName = keyTypeName;
+    this.valueTypeName = valueTypeName;
+  }
+
+  public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap) {
+    this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null);
+  }
+
+  public String getRegionPath() {
+    return regionPath;
+  }
+
+  public boolean isPartitioned() {
+    return isPartitioned;
+  }
+
+  public int getTotalBuckets() {
+    return totalBuckets;
+  }
+  
+  public HashMap<ServerLocation, HashSet<Integer>> getServerBucketMap() {
+    return serverBucketMap;
+  }
+
+  public String getKeyTypeName() {
+    return keyTypeName;
+  }
+
+  public String getValueTypeName() {
+    return valueTypeName;
+  }
+
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("RegionMetadata(region=").append(regionPath)
+       .append("(").append(keyTypeName).append(", ").append(valueTypeName).append(")")
+       .append(", partitioned=").append(isPartitioned).append(", #buckets=").append(totalBuckets)
+       .append(", map=").append(serverBucketMap).append(")");
+    return buf.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java
new file mode 100644
index 0000000..0f4ef1d
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java
@@ -0,0 +1,83 @@
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.execute.*;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+import com.gemstone.gemfire.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+import java.util.Iterator;
+
+public class QueryFunction implements Function {
+
+  private static final long serialVersionUID = 4866641340803692882L;
+
+  public final static String ID = "gemfire-spark-query-function";
+
+  private final static QueryFunction instance = new QueryFunction();
+
+  private static final Logger logger = LogService.getLogger();
+
+  private static final int CHUNK_SIZE = 1024;
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  public static QueryFunction getInstance() {
+    return instance;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    try {
+      String[] args = (String[]) context.getArguments();
+      String queryString = args[0];
+      String bucketSet = args[1];
+      InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
+      LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+      boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+      Query query = CacheFactory.getAnyInstance().getQueryService().newQuery(queryString);
+      Object result = partitioned ? query.execute((InternalRegionFunctionContext) context) : query.execute();
+      ResultSender<Object> sender = context.getResultSender();
+      HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null);
+      Iterator<Object> iter = ((SelectResults) result).asList().iterator();
+      while (iter.hasNext()) {
+        Object row = iter.next();
+        DataSerializer.writeObject(row, buf);
+        if (buf.size() > CHUNK_SIZE) {
+          sender.sendResult(buf.toByteArray());
+          logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " sendResult(), data size=" + buf.size());
+          buf.reset();
+        }
+      }
+      sender.lastResult(buf.toByteArray());
+      logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " lastResult(), data size=" + buf.size());
+      buf.reset();
+    }
+    catch(Exception e) {
+      throw new FunctionException(e);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java
new file mode 100644
index 0000000..50c35c5
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java
@@ -0,0 +1,192 @@
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions;
+
+import java.util.Iterator;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.internal.cache.*;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+import com.gemstone.gemfire.internal.cache.execute.InternalResultSender;
+import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * GemFire function that is used by `SparkContext.gemfireRegion(regionPath, whereClause)`
+ * to retrieve region data set for the given bucket set as a RDD partition 
+ **/
+public class RetrieveRegionFunction implements Function {
+
+  public final static String ID = "spark-gemfire-retrieve-region";
+  private static final Logger logger = LogService.getLogger();
+  private static final RetrieveRegionFunction instance = new RetrieveRegionFunction();
+
+  public RetrieveRegionFunction() {
+  }
+
+  /** ------------------------------------------ */
+  /**     interface Function implementation      */
+  /** ------------------------------------------ */
+
+  public static RetrieveRegionFunction getInstance() {
+    return instance;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    String[] args = (String[]) context.getArguments();
+    String where = args[0];
+    String taskDesc = args[1];
+    InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
+    LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+    boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+    if (where.trim().isEmpty())
+      retrieveFullRegion(irfc, partitioned, taskDesc);
+    else
+      retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc);
+  }
+
+  /** ------------------------------------------ */
+  /**    Retrieve region data with where clause  */
+  /** ------------------------------------------ */
+
+  private void retrieveRegionWithWhereClause(
+    InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) {
+    String regionPath = localRegion.getFullPath();
+    String qstr = "select key, value from " + regionPath + ".entries where " + where;
+    logger.info(desc + ": " + qstr);
+
+    try {
+      Cache cache = CacheFactory.getAnyInstance();
+      QueryService queryService = cache.getQueryService();
+      Query query = queryService.newQuery(qstr);
+      SelectResults<Struct> results =
+        (SelectResults<Struct>) (partitioned ?  query.execute(context) : query.execute());
+
+      Iterator<Object[]> entries = getStructIteratorWrapper(results.asList().iterator());
+      InternalResultSender irs = (InternalResultSender) context.getResultSender();
+      StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
+      sender.send();
+    } catch (Exception e) {
+      throw new FunctionException(e);
+    }
+  }
+
+  private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> entries) {
+    return new WrapperIterator<Struct, Iterator<Struct>>(entries) {
+      @Override public Object[] next() {
+        return  delegate.next().getFieldValues();
+      }
+    };
+  }
+
+  /** ------------------------------------------ */
+  /**         Retrieve full region data          */
+  /** ------------------------------------------ */
+
+  private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) {
+    Iterator<Object[]> entries;
+    if (partitioned) {
+      PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>)
+              ((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator();
+      // entries = getPREntryIterator(iter);
+      entries = getSimpleEntryIterator(iter);
+    } else {
+      LocalRegion owner = (LocalRegion) context.getDataSet();
+      Iterator<Region.Entry> iter = (Iterator<Region.Entry>) owner.entrySet().iterator();
+      // entries = getRREntryIterator(iter, owner);
+      entries = getSimpleEntryIterator(iter);
+    }
+    InternalResultSender irs = (InternalResultSender) context.getResultSender();
+    StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
+    sender.send();
+  }
+
+//  /** An iterator for partitioned region that uses internal API to get serialized value */
+//  private Iterator<Object[]> getPREntryIterator(PREntriesIterator<Region.Entry> iterator) {
+//    return new WrapperIterator<Region.Entry, PREntriesIterator<Region.Entry>>(iterator) {
+//      @Override public Object[] next() {
+//        Region.Entry entry = delegate.next();
+//        int bucketId = delegate.getBucketId();
+//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId);
+//        // owner needs to be the bucket region not the enclosing partition region
+//        LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId);
+//        Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
+//        return new Object[] {keyInfo.getKey(), value};
+//      }
+//    };
+//  }
+//
+//  /** An iterator for replicated region that uses internal API to get serialized value */
+//  private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> iterator, LocalRegion region) {
+//    final LocalRegion owner = region;
+//    return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
+//      @Override public Object[] next() {
+//        Region.Entry entry =  delegate.next();
+//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null);
+//        Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
+//        return new Object[] {keyInfo.getKey(), value};
+//      }
+//    };
+//  }
+
+  // todo. compare performance of regular and simple iterator
+  /** An general iterator for both partitioned and replicated region that returns un-serialized value */
+  private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> iterator) {
+    return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
+      @Override public Object[] next() {
+        Region.Entry entry = delegate.next();
+        return new Object[] {entry.getKey(), entry.getValue()};
+      }
+    };
+  }
+
+  /** ------------------------------------------ */
+  /**        abstract wrapper iterator           */
+  /** ------------------------------------------ */
+
+  /** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */
+  abstract class WrapperIterator<T, S extends Iterator<T>> implements Iterator<Object[]> {
+
+    final S delegate;
+
+    protected WrapperIterator(S delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override public boolean hasNext() {
+      return delegate.hasNext();
+    }
+
+    @Override public void remove() { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java
new file mode 100644
index 0000000..c11c231
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java
@@ -0,0 +1,102 @@
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions;
+
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+import io.pivotal.gemfire.spark.connector.internal.RegionMetadata;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This GemFire function retrieve region metadata
+ */
+public class RetrieveRegionMetadataFunction implements Function {
+
+  public final static String ID = "gemfire-spark-retrieve-region-metadata";
+  
+  private static final RetrieveRegionMetadataFunction instance = new RetrieveRegionMetadataFunction();
+
+  public RetrieveRegionMetadataFunction() {
+  }
+
+  public static RetrieveRegionMetadataFunction getInstance() {
+    return instance;    
+  }
+  
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) context).getDataSet();
+    String regionPath = region.getFullPath();
+    boolean isPartitioned = region.getDataPolicy().withPartitioning();
+    String kTypeName = getTypeClassName(region.getAttributes().getKeyConstraint());
+    String vTypeName = getTypeClassName(region.getAttributes().getValueConstraint());
+
+    RegionMetadata metadata;
+    if (! isPartitioned) {
+      metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, vTypeName);
+    } else {
+      PartitionedRegion pregion = (PartitionedRegion) region;
+      int totalBuckets = pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets();
+      Map<Integer, List<BucketServerLocation66>> bucketMap = pregion.getRegionAdvisor().getAllClientBucketProfiles();
+      HashMap<ServerLocation, HashSet<Integer>> serverMap = bucketServerMap2ServerBucketSetMap(bucketMap);
+      metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, kTypeName, vTypeName);
+    }
+    
+    ResultSender<RegionMetadata> sender = context.getResultSender();
+    sender.lastResult(metadata);
+  }
+  
+  private String getTypeClassName(Class clazz) {
+    return clazz == null ? null : clazz.getCanonicalName();
+  }
+  
+  /** convert bucket to server map to server to bucket set map */
+  private HashMap<ServerLocation, HashSet<Integer>>
+    bucketServerMap2ServerBucketSetMap(Map<Integer, List<BucketServerLocation66>> map) {
+    HashMap<ServerLocation, HashSet<Integer>> serverBucketMap = new HashMap<>();
+    for (Integer id : map.keySet()) {
+      List<BucketServerLocation66> locations = map.get(id);
+      for (BucketServerLocation66 location : locations) {
+        ServerLocation server = new ServerLocation(location.getHostName(), location.getPort());
+        if (location.isPrimary()) {
+          HashSet<Integer> set = serverBucketMap.get(server);
+          if (set == null) {
+            set = new HashSet<>();
+            serverBucketMap.put(server, set);
+          }
+          set.add(id);
+          break;
+        }
+      }
+    }
+    return serverBucketMap;    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java
new file mode 100644
index 0000000..f79c948
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java
@@ -0,0 +1,203 @@
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
+import com.gemstone.gemfire.cache.query.types.ObjectType;
+import com.gemstone.gemfire.cache.query.types.StructType;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * StructStreamingResultSender and StructStreamingResultCollector  are paired
+ * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct`
+ * from GemFire server to Spark Connector (the client of GemFire server)
+ * in streaming, i.e., while sender sending the result, the collector can
+ * start processing the arrived result without waiting for full result to
+ * become available.
+ */
+public class StructStreamingResultSender {
+
+  public static final byte TYPE_CHUNK   = 0x30;
+  public static final byte DATA_CHUNK   = 0x31;
+  public static final byte ERROR_CHUNK  = 0x32;
+  public static final byte SER_DATA     = 0x41;
+  public static final byte UNSER_DATA   = 0x42;
+  public static final byte BYTEARR_DATA = 0x43;
+
+  private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class);
+  public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField});
+
+  private static final Logger logger = LogService.getLogger();
+  private static final int CHUNK_SIZE = 4096;
+  
+  // Note: The type of ResultSender returned from GemFire FunctionContext is
+  //  always ResultSender<Object>, so can't use ResultSender<byte[]> here
+  private final ResultSender<Object> sender;
+  private final StructType structType;
+  private final Iterator<Object[]> rows;
+  private String desc;
+  private boolean closed = false;
+
+  /**
+   * the Constructor 
+   * @param sender the base ResultSender that send data in byte array
+   * @param type the StructType of result record
+   * @param rows the iterator of the collection of results
+   * @param desc description of this result (used for logging)           
+   */
+  public StructStreamingResultSender(
+    ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, String desc) {
+    if (sender == null || rows == null)
+      throw new NullPointerException("sender=" + sender + ", rows=" + rows);
+    this.sender = sender;
+    this.structType = type;
+    this.rows = rows;
+    this.desc = desc;
+  }
+
+  /** the Constructor with default `desc` */
+  public StructStreamingResultSender(
+          ResultSender<Object> sender, StructType type, Iterator<Object[]> rows) {
+    this(sender, type, rows, "StructStreamingResultSender");
+  }
+  
+  /**
+   * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR.
+   * TYPE chunk for sending struct type info, DATA chunk for sending data, and
+   * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted
+   * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually
+   * there are multiple DATA chunks. Each DATA chunk contains multiple rows
+   * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an
+   * exception is thrown, it is serialized and sent as the last chunk  of the 
+   * result (in the form of ERROR chunk).
+   */
+  public void send() {
+    if (closed) throw new RuntimeException("sender is closed.");
+
+    HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null);
+    String dataType = null;
+    int typeSize = 0;
+    int rowCount = 0;
+    int dataSize = 0;            
+    try {
+      if (rows.hasNext()) {
+        // Note: only send type info if there's data with it
+        typeSize = sendType(buf);
+        buf.writeByte(DATA_CHUNK);
+        int rowSize = structType == null ? 2 : structType.getFieldNames().length;
+        while (rows.hasNext()) {
+          rowCount ++;
+          Object[] row = rows.next();          
+          if (rowCount < 2) dataType = entryDataType(row);
+          if (rowSize != row.length) 
+            throw new IOException(rowToString("Expect "  + rowSize + " columns, but got ", row));
+          serializeRowToBuffer(row, buf);
+          if (buf.size() > CHUNK_SIZE) {
+            dataSize += sendBufferredData(buf, false);
+            buf.writeByte(DATA_CHUNK);
+          }
+        }
+      }
+      // send last piece of data or empty byte array
+      dataSize += sendBufferredData(buf, true);
+      logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" +
+                  typeSize + ", data.size=" + dataSize + ", row.avg.size=" +
+                  (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount)));
+    } catch (IOException | RuntimeException e) {
+      sendException(buf, e);
+    } finally {
+      closed = true;
+    }
+  }
+
+  private String rowToString(String rowDesc, Object[] row) {
+    StringBuilder buf = new StringBuilder();
+    buf.append(rowDesc).append("(");
+    for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]);
+    return buf.append(")") .toString();    
+  }
+
+  private String entryDataType(Object[] row) {
+    StringBuilder buf = new StringBuilder();
+    buf.append("(");
+    for (int i = 0; i < row.length; i++) {
+      if (i != 0) buf.append(", ");
+      buf.append(row[i].getClass().getCanonicalName());
+    }
+    return buf.append(")").toString();
+  }
+  
+  private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException {
+    for (Object data : row) {
+      if (data instanceof CachedDeserializable) {
+        buf.writeByte(SER_DATA);
+        DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf);
+      } else if (data instanceof byte[]) {
+        buf.writeByte(BYTEARR_DATA);
+        DataSerializer.writeByteArray((byte[]) data, buf);
+      } else {
+        buf.writeByte(UNSER_DATA);
+        DataSerializer.writeObject(data, buf);
+      }
+    }
+  }
+  
+  /** return the size of type data */
+  private int sendType(HeapDataOutputStream buf) throws IOException {
+    // logger.info(desc + " struct type: " + structType);
+    if (structType != null) {
+      buf.writeByte(TYPE_CHUNK);
+      DataSerializer.writeObject(structType, buf);
+      return sendBufferredData(buf, false);      
+    } else {
+      return 0;  // default KeyValue type, no type info send
+    }
+  }
+  
+  private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException {
+    if (isLast) sender.lastResult(buf.toByteArray());
+    else sender.sendResult(buf.toByteArray());
+    // logData(buf.toByteArray(), desc);
+    int s = buf.size();
+    buf.reset();
+    return s;    
+  }
+
+  /** Send the exception as the last chunk of the result. */
+  private void sendException(HeapDataOutputStream buf, Exception e) {
+    // Note: if exception happens during the serialization, the `buf` may contain
+    // partial serialized data, which may cause de-serialization hang or error.
+    // Therefore, always empty the buffer before sending the exception
+    if (buf.size() > 0) buf.reset();
+    
+    try {
+      buf.writeByte(ERROR_CHUNK);
+      DataSerializer.writeObject(e, buf);
+    } catch (IOException ioe) {
+      logger.error("StructStreamingResultSender failed to send the result:", e);
+      logger.error("StructStreamingResultSender failed to serialize the exception:", ioe);
+      buf.reset();
+    }
+    // Note: send empty chunk as the last result if serialization of exception 
+    // failed, and the error is logged on the GemFire server side.
+    sender.lastResult(buf.toByteArray());
+    // logData(buf.toByteArray(), desc);
+  }
+
+//  private void logData(byte[] data, String desc) {
+//    StringBuilder buf = new StringBuilder();
+//    buf.append(desc);
+//    for (byte b : data) {
+//      buf.append(" ").append(b);
+//    }
+//    logger.info(buf.toString());
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java b/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java
new file mode 100644
index 0000000..ed7cef0
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java
@@ -0,0 +1,38 @@
+package ittest.io.pivotal.gemfire.spark.connector;
+
+import java.io.Serializable;
+
+public class Employee implements Serializable {
+
+  private String name;
+
+  private int age;
+
+  public Employee(String n, int a) {
+    name = n;
+    age = a;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public int getAge() {
+    return age;
+  }
+
+  public String toString() {
+    return new StringBuilder().append("Employee[name=").append(name).
+            append(", age=").append(age).
+            append("]").toString();
+  }
+  
+  public boolean equals(Object o) {
+    if (o instanceof Employee) {
+      return ((Employee) o).name.equals(name) && ((Employee) o).age == age;
+    }
+    return false;
+  }
+
+}
+


Mime
View raw message