geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinmeil...@apache.org
Subject [33/50] [abbrv] incubator-geode git commit: GEODE-1244: Revert rename of package, directory, project and file rename for geode-spark-connector
Date Thu, 21 Apr 2016 17:17:08 GMT
GEODE-1244: Revert rename of package, directory, project and file rename for geode-spark-connector

This reverts commit 760c6e225a0269bcc80161bb0545db22a130e9b7.
The renaming didn't move the files and instead blew away the geode-functions directory


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ddee87fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ddee87fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ddee87fe

Branch: refs/heads/feature/GEODE-17-2
Commit: ddee87fea9e3ada0f2e05cd50c1fb0d3a9ebba68
Parents: f12ece5
Author: Jason Huynh <huynhja@gmail.com>
Authored: Wed Apr 20 10:29:42 2016 -0700
Committer: Jason Huynh <huynhja@gmail.com>
Committed: Wed Apr 20 10:31:30 2016 -0700

----------------------------------------------------------------------
 geode-spark-connector/doc/10_demos.md           |  12 +-
 geode-spark-connector/doc/1_building.md         |   6 +-
 geode-spark-connector/doc/2_quick.md            |  26 +-
 geode-spark-connector/doc/3_connecting.md       |  38 +-
 geode-spark-connector/doc/4_loading.md          |  30 +-
 geode-spark-connector/doc/5_rdd_join.md         |  26 +-
 geode-spark-connector/doc/6_save_rdd.md         |  20 +-
 geode-spark-connector/doc/7_save_dstream.md     |  10 +-
 geode-spark-connector/doc/8_oql.md              |   8 +-
 geode-spark-connector/doc/9_java_api.md         |  36 +-
 .../connector/internal/RegionMetadata.java      |  93 +++
 .../gemfirefunctions/QueryFunction.java         |  99 +++
 .../RetrieveRegionFunction.java                 | 208 +++++++
 .../RetrieveRegionMetadataFunction.java         | 118 ++++
 .../StructStreamingResultSender.java            | 219 +++++++
 .../gemfire/spark/connector/Employee.java       |  54 ++
 .../spark/connector/JavaApiIntegrationTest.java | 424 +++++++++++++
 .../gemfire/spark/connector/Portfolio.java      | 109 ++++
 .../gemfire/spark/connector/Position.java       |  73 +++
 .../src/it/resources/test-regions.xml           |  49 ++
 .../src/it/resources/test-retrieve-regions.xml  |  57 ++
 .../spark/connector/BasicIntegrationTest.scala  | 598 +++++++++++++++++++
 .../RDDJoinRegionIntegrationTest.scala          | 300 ++++++++++
 .../RetrieveRegionIntegrationTest.scala         | 253 ++++++++
 .../gemfire/spark/connector/package.scala       |  29 +
 .../connector/testkit/GemFireCluster.scala      |  47 ++
 .../spark/connector/testkit/GemFireRunner.scala | 148 +++++
 .../spark/connector/testkit/IOUtils.scala       |  94 +++
 .../spark/streaming/ManualClockHelper.scala     |  28 +
 .../spark/streaming/TestInputDStream.scala      |  44 ++
 .../javaapi/GemFireJavaDStreamFunctions.java    |  86 +++
 .../GemFireJavaPairDStreamFunctions.java        |  77 +++
 .../javaapi/GemFireJavaPairRDDFunctions.java    | 238 ++++++++
 .../javaapi/GemFireJavaRDDFunctions.java        | 178 ++++++
 .../javaapi/GemFireJavaSQLContextFunctions.java |  49 ++
 .../GemFireJavaSparkContextFunctions.java       |  87 +++
 .../connector/javaapi/GemFireJavaUtil.java      | 122 ++++
 .../spark/connector/GemFireConnection.scala     |  67 +++
 .../spark/connector/GemFireConnectionConf.scala |  73 +++
 .../connector/GemFireConnectionManager.scala    |  31 +
 .../connector/GemFireFunctionDeployer.scala     |  81 +++
 .../connector/GemFireKryoRegistrator.scala      |  29 +
 .../connector/GemFirePairRDDFunctions.scala     | 140 +++++
 .../spark/connector/GemFireRDDFunctions.scala   | 120 ++++
 .../connector/GemFireSQLContextFunctions.scala  |  42 ++
 .../GemFireSparkContextFunctions.scala          |  39 ++
 .../internal/DefaultGemFireConnection.scala     | 164 +++++
 .../DefaultGemFireConnectionManager.scala       |  77 +++
 .../connector/internal/LocatorHelper.scala      | 135 +++++
 .../StructStreamingResultCollector.scala        | 152 +++++
 .../connector/internal/oql/QueryParser.scala    |  58 ++
 .../spark/connector/internal/oql/QueryRDD.scala |  83 +++
 .../internal/oql/QueryResultCollector.scala     |  69 +++
 .../connector/internal/oql/RDDConverter.scala   |  40 ++
 .../connector/internal/oql/RowBuilder.scala     |  38 ++
 .../connector/internal/oql/SchemaBuilder.scala  |  73 +++
 .../internal/oql/UndefinedSerializer.scala      |  46 ++
 .../connector/internal/rdd/GemFireJoinRDD.scala |  67 +++
 .../internal/rdd/GemFireOuterJoinRDD.scala      |  69 +++
 .../internal/rdd/GemFireRDDPartition.scala      |  36 ++
 .../internal/rdd/GemFireRDDPartitioner.scala    |  59 ++
 .../rdd/GemFireRDDPartitionerImpl.scala         |  89 +++
 .../internal/rdd/GemFireRDDWriter.scala         |  82 +++
 .../internal/rdd/GemFireRegionRDD.scala         | 138 +++++
 .../javaapi/GemFireJavaRegionRDD.scala          |  26 +
 .../spark/connector/javaapi/JavaAPIHelper.scala |  53 ++
 .../gemfire/spark/connector/package.scala       |  69 +++
 .../streaming/GemFireDStreamFunctions.scala     |  89 +++
 .../spark/connector/streaming/package.scala     |  32 +
 .../gemfire/spark/connector/JavaAPITest.java    | 163 +++++
 .../connector/GemFireFunctionDeployerTest.scala |  58 ++
 .../DefaultGemFireConnectionManagerTest.scala   |  82 +++
 ...tStreamingResultSenderAndCollectorTest.scala | 254 ++++++++
 .../internal/oql/QueryParserTest.scala          |  83 +++
 .../connector/ConnectorImplicitsTest.scala      |  50 ++
 .../connector/GemFireConnectionConfTest.scala   | 100 ++++
 .../connector/GemFireDStreamFunctionsTest.scala |  79 +++
 .../connector/GemFireRDDFunctionsTest.scala     | 139 +++++
 .../spark/connector/LocatorHelperTest.scala     | 168 ++++++
 .../rdd/GemFireRDDPartitionerTest.scala         | 190 ++++++
 .../connector/rdd/GemFireRegionRDDTest.scala    | 117 ++++
 .../basic-demos/src/main/java/demo/Emp.java     |  95 +++
 .../src/main/java/demo/OQLJavaDemo.java         |  59 ++
 .../src/main/java/demo/PairRDDSaveJavaDemo.java |  86 +++
 .../src/main/java/demo/RDDSaveJavaDemo.java     |  85 +++
 .../src/main/java/demo/RegionToRDDJavaDemo.java |  57 ++
 .../src/main/scala/demo/NetworkWordCount.scala  |  75 +++
 .../pivotal/geode/spark/connector/Employee.java |  54 --
 .../spark/connector/JavaApiIntegrationTest.java | 424 -------------
 .../geode/spark/connector/Portfolio.java        | 109 ----
 .../pivotal/geode/spark/connector/Position.java |  73 ---
 .../src/it/resources/test-regions.xml           |  49 --
 .../src/it/resources/test-retrieve-regions.xml  |  57 --
 .../spark/connector/BasicIntegrationTest.scala  | 598 -------------------
 .../RDDJoinRegionIntegrationTest.scala          | 300 ----------
 .../RetrieveRegionIntegrationTest.scala         | 253 --------
 .../pivotal/geode/spark/connector/package.scala |  29 -
 .../spark/connector/testkit/GeodeCluster.scala  |  47 --
 .../spark/connector/testkit/GeodeRunner.scala   | 148 -----
 .../geode/spark/connector/testkit/IOUtils.scala |  94 ---
 .../spark/streaming/ManualClockHelper.scala     |  28 -
 .../spark/streaming/TestInputDStream.scala      |  44 --
 .../javaapi/GeodeJavaDStreamFunctions.java      |  86 ---
 .../javaapi/GeodeJavaPairDStreamFunctions.java  |  77 ---
 .../javaapi/GeodeJavaPairRDDFunctions.java      | 238 --------
 .../javaapi/GeodeJavaRDDFunctions.java          | 178 ------
 .../javaapi/GeodeJavaSQLContextFunctions.java   |  49 --
 .../javaapi/GeodeJavaSparkContextFunctions.java |  87 ---
 .../spark/connector/javaapi/GeodeJavaUtil.java  | 122 ----
 .../geode/spark/connector/GeodeConnection.scala |  67 ---
 .../spark/connector/GeodeConnectionConf.scala   |  73 ---
 .../connector/GeodeConnectionManager.scala      |  31 -
 .../spark/connector/GeodeFunctionDeployer.scala |  81 ---
 .../spark/connector/GeodeKryoRegistrator.scala  |  29 -
 .../spark/connector/GeodePairRDDFunctions.scala | 140 -----
 .../spark/connector/GeodeRDDFunctions.scala     | 120 ----
 .../connector/GeodeSQLContextFunctions.scala    |  42 --
 .../connector/GeodeSparkContextFunctions.scala  |  39 --
 .../internal/DefaultGeodeConnection.scala       | 164 -----
 .../DefaultGeodeConnectionManager.scala         |  77 ---
 .../connector/internal/LocatorHelper.scala      | 135 -----
 .../StructStreamingResultCollector.scala        | 152 -----
 .../connector/internal/oql/QueryParser.scala    |  58 --
 .../spark/connector/internal/oql/QueryRDD.scala |  83 ---
 .../internal/oql/QueryResultCollector.scala     |  69 ---
 .../connector/internal/oql/RDDConverter.scala   |  40 --
 .../connector/internal/oql/RowBuilder.scala     |  38 --
 .../connector/internal/oql/SchemaBuilder.scala  |  73 ---
 .../internal/oql/UndefinedSerializer.scala      |  46 --
 .../connector/internal/rdd/GeodeJoinRDD.scala   |  67 ---
 .../internal/rdd/GeodeOuterJoinRDD.scala        |  69 ---
 .../internal/rdd/GeodeRDDPartition.scala        |  36 --
 .../internal/rdd/GeodeRDDPartitioner.scala      |  59 --
 .../internal/rdd/GeodeRDDPartitionerImpl.scala  |  89 ---
 .../connector/internal/rdd/GeodeRDDWriter.scala |  82 ---
 .../connector/internal/rdd/GeodeRegionRDD.scala | 138 -----
 .../connector/javaapi/GeodeJavaRegionRDD.scala  |  26 -
 .../spark/connector/javaapi/JavaAPIHelper.scala |  53 --
 .../pivotal/geode/spark/connector/package.scala |  69 ---
 .../streaming/GeodeDStreamFunctions.scala       |  89 ---
 .../spark/connector/streaming/package.scala     |  32 -
 .../geode/spark/connector/JavaAPITest.java      | 163 -----
 .../connector/GeodeFunctionDeployerTest.scala   |  58 --
 .../DefaultGemFireConnectionManagerTest.scala   |  82 ---
 ...tStreamingResultSenderAndCollectorTest.scala | 254 --------
 .../internal/oql/QueryParserTest.scala          |  83 ---
 .../connector/ConnectorImplicitsTest.scala      |  50 --
 .../connector/GeodeConnectionConfTest.scala     | 100 ----
 .../connector/GeodeDStreamFunctionsTest.scala   |  79 ---
 .../spark/connector/GeodeRDDFunctionsTest.scala | 139 -----
 .../spark/connector/LocatorHelperTest.scala     | 168 ------
 .../connector/rdd/GeodeRDDPartitionerTest.scala | 190 ------
 .../connector/rdd/GeodeRegionRDDTest.scala      | 117 ----
 .../basic-demos/src/main/java/demo/Emp.java     |  95 ---
 .../src/main/java/demo/OQLJavaDemo.java         |  59 --
 .../src/main/java/demo/PairRDDSaveJavaDemo.java |  86 ---
 .../src/main/java/demo/RDDSaveJavaDemo.java     |  85 ---
 .../src/main/java/demo/RegionToRDDJavaDemo.java |  57 --
 .../src/main/scala/demo/NetworkWordCount.scala  |  75 ---
 .../project/Dependencies.scala                  |   8 +-
 .../project/GemFireSparkBuild.scala             |  76 +++
 .../project/GeodeSparkBuild.scala               |  76 ---
 geode-spark-connector/project/Settings.scala    |   4 +-
 163 files changed, 8375 insertions(+), 7638 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/10_demos.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/10_demos.md b/geode-spark-connector/doc/10_demos.md
index 4b4777a..30da687 100644
--- a/geode-spark-connector/doc/10_demos.md
+++ b/geode-spark-connector/doc/10_demos.md
@@ -40,21 +40,21 @@ gfsh> create region --name=str_int_region --type=PARTITION --key-constraint=java
 
 And deploy Geode functions required by the Spark Geode Connector:
 ```
-gfsh> deploy --jar=<path to connector project>/geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar
+gfsh> deploy --jar=<path to connector project>/gemfire-functions/target/scala-2.10/gemfire-functions_2.10-0.5.0.jar
 ```
 
 ### Run simple demos
 This section describes how to run `RDDSaveJavaDemo.java`, 
 `PairRDDSaveJavaDemo.java` and `RegionToRDDJavaDemo.java`:
 ```
-export SPARK_CLASSPATH=$CONNECTOR/geode-spark-connector/target/scala-2.10/geode-spark-connector_2.10-0.5.0.jar:$GEODE/lib/server-dependencies.jar
+export SPARK_CLASSPATH=$CONNECTOR/gemfire-spark-connector/target/scala-2.10/gemfire-spark-connector_2.10-0.5.0.jar:$GEODE/lib/server-dependencies.jar
 
 cd <spark 1.3 dir>
-bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port]
+bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port]
 
-bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port]
+bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port]
 
-bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port]
+bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port]
 ```
 
 ### Run stateful network word count
@@ -67,7 +67,7 @@ $ nc -lk 9999
 
 **Terminal-2**, start word count Spark app: 
 ```
-bin/spark-submit --master=local[2] demo.NetworkWordCount $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port`
+bin/spark-submit --master=local[2] demo.NetworkWordCount $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port`
 ```
 
 Switch to Terminal-1, type some words, and hit `enter` or `return` key, then check word count at **Terminal-3**, which has `gfsh` connected to the Geode cluster:

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/1_building.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/1_building.md b/geode-spark-connector/doc/1_building.md
index 47b17db..9b4d69e 100644
--- a/geode-spark-connector/doc/1_building.md
+++ b/geode-spark-connector/doc/1_building.md
@@ -17,9 +17,9 @@ sbt clean package
 ```
 
 The following jar files will be created:
- - `geode-spark-connector/target/scala-2.10/geode-spark-connector_2.10-0.5.0.jar`
- - `geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar`
- - `geode-spark-demos/target/scala-2.10/geode-spark-demos_2.10-0.5.0.jar `
+ - `gemfire-spark-connector/target/scala-2.10/gemfire-spark-connector_2.10-0.5.0.jar`
+ - `gemfire-functions/target/scala-2.10/gemfire-functions_2.10-0.5.0.jar`
+ - `gemfire-spark-demos/target/scala-2.10/gemfire-spark-demos_2.10-0.5.0.jar `
 
 ### Testing
 Commands to run unit and integration tests:

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/2_quick.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/2_quick.md b/geode-spark-connector/doc/2_quick.md
index 01f3c06..ec331c3 100644
--- a/geode-spark-connector/doc/2_quick.md
+++ b/geode-spark-connector/doc/2_quick.md
@@ -15,7 +15,7 @@ is a good starting point.
 You need 2 terminals to follow along, one for Geode shell `gfsh`, and one for Spark shell. Set up Jdk 1.7 on both of them.
 
 ### Geode `gfsh` terminal
-In this terminal, start Geode cluster, deploy Spark Geode Connector's geode-function jar, and create demo regions.
+In this terminal, start Geode cluster, deploy Spark Geode Connector's gemfire-function jar, and create demo regions.
 
 Set up environment variables:
 ```
@@ -41,9 +41,9 @@ gfsh>create region --name=str_str_region --type=PARTITION --key-constraint=java.
 gfsh>create region --name=int_str_region --type=PARTITION --key-constraint=java.lang.Integer --value-constraint=java.lang.String
 ```
 
-Deploy Spark Geode Connector's geode-function jar (`geode-functions_2.10-0.5.0.jar`):
+Deploy Spark Geode Connector's gemfire-function jar (`gemfire-functions_2.10-0.5.0.jar`):
 ```
-gfsh>deploy --jar=<path to connector project>/geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar
+gfsh>deploy --jar=<path to connector project>/gemfire-functions/target/scala-2.10/gemfire-functions_2.10-0.5.0.jar
 ```
 
 ### Spark shell terminal
@@ -52,7 +52,7 @@ In this terminal, setup Spark environment, and start Spark shell.
 Set Geode locator property in Spark configuration: add 
 following to `<spark-dir>/conf/spark-defaults.conf`:
 ```
-spark.geode.locators=localhost[55221]
+spark.gemfire.locators=localhost[55221]
 ```
 Note:
  - if the file doesn't exist, create one. 
@@ -69,20 +69,20 @@ under the same directory to `log4j.properties` and update the file.
 
 Start spark-shell:
 ```
-bin/spark-shell --master local[*] --jars $CONNECTOR/geode-spark-connector/target/scala-2.10/geode-spark-connector_2.10-0.5.0.jar,$GEODE/lib/server-dependencies.jar
+bin/spark-shell --master local[*] --jars $CONNECTOR/gemfire-spark-connector/target/scala-2.10/gemfire-spark-connector_2.10-0.5.0.jar,$GEODE/lib/server-dependencies.jar
 ```
 
 Check Geode locator property in the Spark shell:
 ```
-scala> sc.getConf.get("spark.geode.locators")
+scala> sc.getConf.get("spark.gemfire.locators")
 res0: String = localhost[55221]
 ```
 
 In order to enable Geode specific functions, you need to import 
-`io.pivotal.geode.spark.connector._`
+`io.pivotal.gemfire.spark.connector._`
 ```
-scala> import io.pivotal.geode.spark.connector._
-import io.pivotal.geode.spark.connector._
+scala> import io.pivotal.gemfire.spark.connector._
+import io.pivotal.gemfire.spark.connector._
 ```
 
 ### Save Pair RDD to Geode
@@ -153,16 +153,16 @@ NEXT_STEP_NAME : END
 ### Expose Geode Region As RDD
 The same API is used to expose both replicated and partitioned region as RDDs. 
 ```
-scala> val rdd = sc.geodeRegion[String, String]("str_str_region")
-rdd: io.pivotal.geode.spark.connector.rdd.GemFireRDD[String,String] = GemFireRDD[2] at RDD at GemFireRDD.scala:19
+scala> val rdd = sc.gemfireRegion[String, String]("str_str_region")
+rdd: io.pivotal.gemfire.spark.connector.rdd.GemFireRDD[String,String] = GemFireRDD[2] at RDD at GemFireRDD.scala:19
 
 scala> rdd.foreach(println)
 (1,one)
 (3,three)
 (2,two)
 
-scala> val rdd2 = sc.geodeRegion[Int, String]("int_str_region")
-rdd2: io.pivotal.geode.spark.connector.rdd.GemFireRDD[Int,String] = GemFireRDD[3] at RDD at GemFireRDD.scala:19
+scala> val rdd2 = sc.gemfireRegion[Int, String]("int_str_region")
+rdd2: io.pivotal.gemfire.spark.connector.rdd.GemFireRDD[Int,String] = GemFireRDD[3] at RDD at GemFireRDD.scala:19
 
 scala> rdd2.foreach(println)
 (2,ab)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/3_connecting.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/3_connecting.md b/geode-spark-connector/doc/3_connecting.md
index 1a4dadd..8972be9 100644
--- a/geode-spark-connector/doc/3_connecting.md
+++ b/geode-spark-connector/doc/3_connecting.md
@@ -2,48 +2,48 @@
 
 There are two ways to connect Spark to Geode:
  - Specify Geode connection properties via `SparkConf`.
- - Specify Geode connection properties via `GeodeConnectionConf`.
+ - Specify Geode connection properties via `GemFireConnectionConf`.
 
 ### Specify Geode connection properties via `SparkConf`
-The only required Geode connection property is `spark.geode.locators`. 
+The only required Geode connection property is `spark.gemfire.locators`. 
 This can be specified in `<spark dir>/conf/spark-defaults.conf` or in Spark 
 application code. In the following examples, we assume you want to provide
 3 extra properties: `security-client-auth-init`, `security-username`, and 
-`security-password`, note that they are prefixed with `spark.geode.`.
+`security-password`, note that they are prefixed with `spark.gemfire.`.
  
 In `<spark dir>/conf/spark-defaults.com`
 ```
-spark.geode.locators=192.168.1.47[10334]
-spark.geode.security-client-auth-init=com.gemstone.geode.security.templates.UserPasswordAuthInit.create
-spark.geode.security-username=scott
-spark.geode.security-password=tiger
+spark.gemfire.locators=192.168.1.47[10334]
+spark.gemfire.security-client-auth-init=com.gemstone.gemfire.security.templates.UserPasswordAuthInit.create
+spark.gemfire.security-username=scott
+spark.gemfire.security-password=tiger
 ```
  
 Or in the Spark application code:
 ```
-import io.pivotal.geode.spark.connector._
+import io.pivotal.gemfire.spark.connector._
 val sparkConf = new SparkConf()
-  .set(GeodeLocatorPropKey, "192.168.1.47[10334]")
-  .set("spark.geode.security-client-auth-init", "com.gemstone.geode.security.templates.UserPasswordAuthInit.create")
-  .set("spark.geode.security-username", "scott")
-  .set("spark.geode.security-password", "tiger")
+  .set(GemFireLocatorPropKey, "192.168.1.47[10334]")
+  .set("spark.gemfire.security-client-auth-init", "com.gemstone.gemfire.security.templates.UserPasswordAuthInit.create")
+  .set("spark.gemfire.security-username", "scott")
+  .set("spark.gemfire.security-password", "tiger")
 ```
 
-After this, you can use all connector APIs without providing `GeodeConnectionConf`.
+After this, you can use all connector APIs without providing `GemfireConnectionConf`.
  
-### Specify Geode connection properties via `GeodeConnectionConf`
-Here's the code that creates `GeodeConnectionConf` with the same set of 
+### Specify Geode connection properties via `GemFireConnectionConf`
+Here's the code that creates `GemFireConnectionConf` with the same set of 
 properties as the examples above:
 ```
-val props = Map("security-client-auth-init" -> "com.gemstone.geode.security.templates.UserPasswordAuthInit.create",
+val props = Map("security-client-auth-init" -> "com.gemstone.gemfire.security.templates.UserPasswordAuthInit.create",
                 "security-username" -> "scott",
                 "security-password" -> "tiger")
-val connConf = GeodeConnectionConf("192.168.1.47[10334]", props)
+val connConf = GemFireConnectionConf("192.168.1.47[10334]", props)
 ``` 
 
-Please note that those properties are **not** prefixed with `spark.geode.`.
+Please note that those properties are **not** prefixed with `spark.gemfire.`.
 
-After this, you can use all connector APIs that require `GeodeConnectionConf`.
+After this, you can use all connector APIs that require `GemFireConnectionConf`.
 
 ### Notes about locators
  - You can specify locator in two formats: `host[port]` or `host:port`. For

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/4_loading.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/4_loading.md b/geode-spark-connector/doc/4_loading.md
index d6789dd..b67a96e 100644
--- a/geode-spark-connector/doc/4_loading.md
+++ b/geode-spark-connector/doc/4_loading.md
@@ -1,16 +1,16 @@
 ## Loading Data from Geode
 
 To expose full data set of a Geode region as a Spark
-RDD, call `geodeRegion` method on the SparkContext object.
+RDD, call `gemfireRegion` method on the SparkContext object.
 
 ```
-val rdd = sc.geodeRegion("region path")
+val rdd = sc.gemfireRegion("region path")
 ```
 
-Or with specific `GeodeConectionConf` object instance (see 
-[Connecting to  Geode](3_connecting.md) for how to create GeodeConectionConf):
+Or with specific `GemfireConectionConf` object instance (see 
+[Connecting to  Geode](3_connecting.md) for how to create GemfireConectionConf):
 ```
-val rdd = sc.geodeRegion("region path", connConf)
+val rdd = sc.gemfireRegion("region path", connConf)
 ```
 
 ## Geode RDD Partitions
@@ -22,24 +22,24 @@ upon multiple servers, and may have duplicates for high
 availability.
 
 Since replicated region has its full dataset available on every
-server, there is only one RDD partition for a `GeodeRegionRDD` that 
+server, there is only one RDD partition for a `GemFireRegionRDD` that 
 represents a replicated region.
 
-For a `GeodeRegionRDD` that represents a partitioned region, there are 
+For a `GemFireRegionRDD` that represents a partitioned region, there are 
 many potential  ways to create RDD partitions. So far, we have 
 implemented ServerSplitsPartitioner, which will split the bucket set
 on each Geode server into two RDD partitions by default.
 The number of splits is configurable, the following shows how to set 
 three partitions per Geode server:
 ```
-import io.pivotal.geode.spark.connector._
+import io.pivotal.gemfire.spark.connector._
 
 val opConf = Map(PreferredPartitionerPropKey -> ServerSplitsPartitionerName,
                  NumberPartitionsPerServerPropKey -> "3")
 
-val rdd1 = sc.geodeRegion[String, Int]("str_int_region", opConf = opConf)
+val rdd1 = sc.gemfireRegion[String, Int]("str_int_region", opConf = opConf)
 // or
-val rdd2 = sc.geodeRegion[String, Int]("str_int_region", connConf, opConf)  
+val rdd2 = sc.gemfireRegion[String, Int]("str_int_region", connConf, opConf)  
 ```
 
 
@@ -48,7 +48,7 @@ Server-side filtering allow exposing partial dataset of a Geode region
 as a RDD, this reduces the amount of data transferred from Geode to 
 Spark to speed up processing.
 ```
-val rdd = sc.geodeRegion("<region path>").where("<where clause>")
+val rdd = sc.gemfireRegion("<region path>").where("<where clause>")
 ```
 
 The above call is translated to OQL query `select key, value from /<region path>.entries where <where clause>`, then 
@@ -59,7 +59,7 @@ In the following demo, javabean class `Emp` is used, it has 5 attributes: `id`,
 In order to make `Emp` class available on Geode servers, we need to deploy a jar file that contains `Emp` class, 
 now build the `emp.jar`,  deploy it and create region `emps` in `gfsh`:
 ```
-zip $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar \
+zip $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar \
   -i "demo/Emp.class" --out $CONNECTOR/emp.jar
   
 gfsh
@@ -73,7 +73,7 @@ only contains `Emp.class`.
 Now in Spark shell, generate some random `Emp` records, and save them to region `emps` (remember to add `emp.jar` to 
 Spark shell classpath before starting Spark shell):
 ```
-import io.pivotal.geode.spark.connector._
+import io.pivotal.gemfire.spark.connector._
 import scala.util.Random
 import demo.Emp
 
@@ -84,12 +84,12 @@ def rpick(xs: List[String]): String = xs(Random.nextInt(xs.size))
 
 val d1 = (1 to 20).map(x => new Emp(x, rpick(lnames), rpick(fnames), 20+Random.nextInt(41), rpick(locs))).toArray
 val rdd1 = sc.parallelize(d1) 
-rdd1.saveToGeode("emps", e => (e.getId, e))
+rdd1.saveToGemfire("emps", e => (e.getId, e))
 ```
 
 Now create a RDD that contains all employees whose age is less than 40, and display its contents:
 ```
-val rdd1s = sc.geodeRegion("emps").where("value.getAge() < 40")
+val rdd1s = sc.gemfireRegion("emps").where("value.getAge() < 40")
 
 rdd1s.foreach(println)
 (5,Emp(5, Taylor, Robert, 32, FL))

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/5_rdd_join.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/5_rdd_join.md b/geode-spark-connector/doc/5_rdd_join.md
index 5c0d6c8..81be061 100644
--- a/geode-spark-connector/doc/5_rdd_join.md
+++ b/geode-spark-connector/doc/5_rdd_join.md
@@ -2,7 +2,7 @@
 
 The Spark Geode Connector suports using any RDD as a source
 of a join and outer join with a Geode region through APIs
-`joinGeodeRegion[K, V]` and `outerJoinGeodeRegion[K, V]`. 
+`joinGemfireRegion[K, V]` and `outerJoinGemfireRegion[K, V]`. 
 Those two APIs execute a single `region.getAll` call for every 
 partition of the source RDD, so no unnecessary data will be requested
 or transferred. This means a join or outer join between any RDD and
@@ -20,8 +20,8 @@ following examples.
 ### RDD[(K, V1)] join and outer join Region[K, V2]
 
 In this case, the source RDD is a pair RDD,  and it has the same key
-type as the Region. Use API `rdd.joinGeodeRegion[K, V2](regionPath)` and 
-`rdd.outerJoinGeodeRegion[K, V2](regionPath)` do the join and outer
+type as the Region. Use API `rdd.joinGemfireRegion[K, V2](regionPath)` and 
+`rdd.outerJoinGemfireRegion[K, V2](regionPath)` do the join and outer
 join. 
 
 Prepare a source RDD `rdd2`:
@@ -49,7 +49,7 @@ rdd2.foreach(println)
 
 Join RDD `rdd2` with region `emps`, and print out the result:
 ```
-val rdd2j = rdd2.joinGeodeRegion[Int, Emp]("emps")
+val rdd2j = rdd2.joinGemfireRegion[Int, Emp]("emps")
 
 rdd2j.foreach(println)
 ((11,message-11),Emp(11, Taylor, Emma, 44, CA))
@@ -69,7 +69,7 @@ entries have those key values.
 
 Outer join RDD `rdd2` with region `emps`, and print out the result:
 ```
-val rdd2o = rdd2.outerJoinGeodeRegion[Int, Emp]("emps")
+val rdd2o = rdd2.outerJoinGemfireRegion[Int, Emp]("emps")
 
 rdd2o.foreach(println)
 ((18,message-18),Some(Emp(18, Powell, Alice, 58, FL)))
@@ -95,8 +95,8 @@ since there's no region entries have those key values.
 ### RDD[(K1, V1)] join and outer join Region[K2, V2]
 
 In this case, the source RDD is still a pair RDD,  but it has different
-key type. Use API `rdd.joinGeodeRegion[K2, V2](regionPath, func)` and 
-`rdd.outerJoinGeodeRegion[K2, V2](regionPath, func)` do the join and 
+key type. Use API `rdd.joinGemfireRegion[K2, V2](regionPath, func)` and 
+`rdd.outerJoinGemfireRegion[K2, V2](regionPath, func)` do the join and 
 outer join, where `func` is the function to generate key from (k, v)
 pair, the element of source RDD, to join with Geode region.
 
@@ -125,7 +125,7 @@ rdd3.foreach(println)
 
 Join RDD `rdd3` (RDD[(String, Int)] with region `emps` (Region[Int, Emp]), and print out the result:
 ```
-val rdd3j = rdd3.joinGeodeRegion[Int, Emp]("emps", pair => pair._2)
+val rdd3j = rdd3.joinGemfireRegion[Int, Emp]("emps", pair => pair._2)
 
 rdd3j.foreach(println)
 ((message-18,18),Emp(18, Powell, Alice, 58, FL))
@@ -144,7 +144,7 @@ RDD and join key.
 
 Outer join RDD `rdd3` with region `emps`, and print out the result:
 ```
-val rdd3o = rdd3.outerJoinGeodeRegion[Int, Emp]("emps", pair => pair._2)
+val rdd3o = rdd3.outerJoinGemfireRegion[Int, Emp]("emps", pair => pair._2)
 
 rdd3o.foreach(println)
 ((message-18,18),Some(Emp(18, Powell, Alice, 58, FL)))
@@ -166,8 +166,8 @@ rdd3o.foreach(println)
 
 ### RDD[T] join and outer join Region[K, V]
 
-Use API `rdd.joinGeodeRegion[K, V](regionPath, func)` and 
-`rdd.outerJoinGeodeRegion[K, V](regionPath, func)` do the join
+Use API `rdd.joinGemfireRegion[K, V](regionPath, func)` and 
+`rdd.outerJoinGemfireRegion[K, V](regionPath, func)` do the join
 and outer join, where `func` is the function to generate key from
 `t`, the element of source RDD, to join with Geode region.
 
@@ -196,7 +196,7 @@ rdd4.foreach(println)
 
 Join RDD `d4` with region `emps`, and print out the result:
 ```
-val rdd4j = rdd4.joinGeodeRegion[Int, Emp]("emps", x => x/2)
+val rdd4j = rdd4.joinGemfireRegion[Int, Emp]("emps", x => x/2)
 
 rdd4j.foreach(println)
 (22,Emp(11, Taylor, Emma, 44, CA))
@@ -213,7 +213,7 @@ rdd4j.foreach(println)
 
 Outer join RDD `d4` with region `emps`, and print out the result:
 ```
-val rdd4o = rdd4.outerJoinGeodeRegion[Int, Emp]("emps", x => x/2)
+val rdd4o = rdd4.outerJoinGemfireRegion[Int, Emp]("emps", x => x/2)
 
 rdd4o.foreach(println)
 (36,Some(Emp(18, Powell, Alice, 58, FL)))

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/6_save_rdd.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/6_save_rdd.md b/geode-spark-connector/doc/6_save_rdd.md
index 73fd8b0..004ef62 100644
--- a/geode-spark-connector/doc/6_save_rdd.md
+++ b/geode-spark-connector/doc/6_save_rdd.md
@@ -7,7 +7,7 @@ It is possible to save any RDD to a Geode region. The requirements are:
  - the target region exists.
 
 To save an RDD to an existing Geode region, import 
-`io.pivotal.geode.spark.connector._` and call the `saveToGeode` 
+`io.pivotal.gemfire.spark.connector._` and call the `saveToGemfire` 
 method on RDD.
 
 ### Save RDD[(K, V)] to Geode
@@ -15,13 +15,13 @@ For pair RDD, i.e., RDD[(K, V)], the pair is treated as key/value pair.
 ```
 val data = Array(("1","one"),("2","two"),("3","three"))
 val rdd = sc.parallelize(data)
-rdd.saveToGeode("str_str_region")
+rdd.saveToGemfire("str_str_region")
 ```
 
-If you create GeodeConnectionConf as described in 
+If you create GemFireConnectionConf as described in 
 [Connecting to Geode](3_connecting.md), the last statement becomes:
 ```
-rdd.saveToGeode("str_str_region", connConf)
+rdd.saveToGemFire("str_str_region", connConf)
 ```
 
 You can verify the region contents:
@@ -52,27 +52,27 @@ then the pair is save to Geode.
 ```
 val data2 = Array("a","ab","abc")
 val rdd2 = sc.parallelize(data2)
-rdd2.saveToGeode("str_int_region", e => (e, e.length))
-// or use GeodeConnectionConf object directly
-// rdd2.saveToGeode("rgnb", e => (e, e.length), connConf)
+rdd2.saveToGemfire("str_int_region", e => (e, e.length))
+// or use GemFireConnectionConf object directly
+// rdd2.saveToGemfire("rgnb", e => (e, e.length), connConf)
 ```
 
 ### `rdd.save.batch.size` 
 
-The connector invokes Geode API `putAll()` to save the data. To make
+The connector invokes GemFire API `putAll()` to save the data. To make
 `putAll()` more efficient, the connector invokes putAll() for every 
 10,000 entries by default. This batch size can be changed with optional
 parameter `opConf`. The following shows how to do it:
 
 ```
   // in Scala
-  rdd.saveToGeode(regionPath, opConf = Map(RDDSaveBatchSizePropKey -> "5000"))
+  rdd.saveToGemfire(regionPath, opConf = Map(RDDSaveBatchSizePropKey -> "5000"))
 
   // in Java
   Properties opConf = new Properties();
   opConf.put(RDDSaveBatchSizePropKey, "5000");
   ...
-  javaFunctions(rdd).saveToGeode(regionPath, opConf); 
+  javaFunctions(rdd).saveToGemfire(regionPath, opConf); 
    
   // note: RDDSaveBatchSizePropKey = "rdd.save.batch.size" 
 ```

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/7_save_dstream.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/7_save_dstream.md b/geode-spark-connector/doc/7_save_dstream.md
index a0019c6..ecc793b 100644
--- a/geode-spark-connector/doc/7_save_dstream.md
+++ b/geode-spark-connector/doc/7_save_dstream.md
@@ -48,16 +48,16 @@ ssc.awaitTermination() // Wait for the computation to terminate
 
 #### Spark Streaming With Geode
 Now let's save the running word count to Geode region `str_int_region`, which 
-simply replace print() with saveToGeode():
+simply replace print() with saveToGemfire():
 
 ```
-import io.pivotal.geode.spark.connector.streaming._
-runningCounts.saveToGeode("str_int_region")
+import io.pivotal.gemfire.spark.connector.streaming._
+runningCounts.saveToGemfire("str_int_region")
 ```
 
-You can use the version of saveToGeode that has the parameter `GeodeConnectionConf`:
+You can use the version of saveToGemfire that has the parameter `GemFireConnectionConf`:
 ```
-runningCounts.saveToGeode("str_int_region", connConf)
+runningCounts.saveToGemfire("str_int_region", connConf)
 ```
 
 See [Spark Streaming Programming Guide]

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/8_oql.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/8_oql.md b/geode-spark-connector/doc/8_oql.md
index ef32ef6..f409698 100644
--- a/geode-spark-connector/doc/8_oql.md
+++ b/geode-spark-connector/doc/8_oql.md
@@ -11,7 +11,7 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 
 Create a `DataFrame` using OQL:
 ```
-val dataFrame = sqlContext.geodeOQL("SELECT * FROM /CustomerRegion WHERE status = 'active'")
+val dataFrame = sqlContext.gemfireOQL("SELECT * FROM /CustomerRegion WHERE status = 'active'")
 ```
 
 You can repartition the `DataFrame` using `DataFrame.repartition()` if needed. 
@@ -33,9 +33,9 @@ If KryoSerializer is preferred, as described in [Spark Documentation]
 val conf = new SparkConf()
   .setAppName("MySparkApp")
   .setMaster("local[*]")
-  .set(GeodeLocatorPropKey, "localhost[55221]")
+  .set(GemFireLocatorPropKey, "localhost[55221]")
   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-  .set("spark.kryo.registrator", "io.pivotal.geode.spark.connector.GeodeKryoRegistrator")
+  .set("spark.kryo.registrator", "io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator")
 ```
 
 and register the classes (optional)
@@ -46,7 +46,7 @@ conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
 Use the following options to start Spark shell:
 ```
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
- --conf spark.kryo.registrator=io.pivotal.geode.spark.connector.GeodeKryoRegistrator
+ --conf spark.kryo.registrator=io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator
 ```
 
 ## References

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/doc/9_java_api.md
----------------------------------------------------------------------
diff --git a/geode-spark-connector/doc/9_java_api.md b/geode-spark-connector/doc/9_java_api.md
index 6fbc636..b9ac91e 100644
--- a/geode-spark-connector/doc/9_java_api.md
+++ b/geode-spark-connector/doc/9_java_api.md
@@ -6,25 +6,25 @@ understand how the Spark Geode Connector works.
 
 ### Prerequisites
 The best way to use the Spark Geode Connector Java API is to statically
-import all of the methods in `GeodeJavaUtil`. This utility class is
+import all of the methods in `GemFireJavaUtil`. This utility class is
 the main entry point for Spark Geode Connector Java API.
 ```
-import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*;
+import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
 ```
 
 Create JavaSparkContext (don't forget about the static import):
 ```
 SparkConf conf = new SparkConf();
-conf.set(GeodeLocatorPropKey, "192.168.1.47[10334]")
+conf.set(GemFireLocatorPropKey, "192.168.1.47[10334]")
 JavaSparkContext jsc = new JavaSparkContext(conf);
 ```
 
 ### Accessing Geode region in Java
-Geode region is exposed as `GeodeJavaRegionRDD<K,V>`(subclass of
+Geode region is exposed as `GemFireJavaRegionRDD<K,V>`(subclass of
 `JavaPairRDD<K, V>`):
 ```
-GeodeJavaRegionRDD<Int, Emp> rdd1 = javaFunctions(jsc).geodeRegion("emps")
-GeodeJavaRegionRDD<Int, Emp> rdd2 = rdd1.where("value.getAge() < 40");
+GemFireJavaRegionRDD<Int, Emp> rdd1 = javaFunctions(jsc).gemfireRegion("emps")
+GemFireJavaRegionRDD<Int, Emp> rdd2 = rdd1.where("value.getAge() < 40");
 ```
 
 ### RDD Join and Outer Join
@@ -39,10 +39,10 @@ static class MyKeyFunction implements Function<Tuple2<String, Integer>, Integer>
 MyKeyFunction func = new MyKeyFunction();
 
 JavaPairRDD<Tuple2<String, Integer>, Emp> rdd3j =
-  javaFunction(rdd3).joinGeodeRegion("emps", func);
+  javaFunction(rdd3).joinGemfireRegion("emps", func);
 
 JavaPairRDD<Tuple2<String, Integer>, Option<Emp>> rdd3o = 
-  javaFunction(rdd3).outerJoinGeodeRegion("emps", func);
+  javaFunction(rdd3).outerJoinGemfireRegion("emps", func);
 
 ```
 
@@ -57,11 +57,11 @@ data.add(new Tuple2<>("9", "nine"));
 // create JavaPairRDD
 JavaPairRDD<String, String> rdd1 = jsc.parallelizePairs(data);
 // save to Geode
-javaFunctions(rdd1).saveToGeode("str_str_region");
+javaFunctions(rdd1).saveToGemfire("str_str_region");
 ```
 
 In order to save `JavaRDD<Tuple2<K,V>>`, it needs to be converted to 
-`JavaPairRDD<K,V>` via static method `toJavaPairRDD` from `GeodeJavaUtil`:
+`JavaPairRDD<K,V>` via static method `toJavaPairRDD` from `GemFireJavaUtil`:
 ```
 List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
 data2.add(new Tuple2<>("11", "eleven"));
@@ -71,7 +71,7 @@ data2.add(new Tuple2<>("13", "thirteen"));
 // create JavaRDD<Tuple2<K,V>>
 JavaRDD<Tuple2<String, String>> rdd2 =  jsc.parallelize(data2);
 // save to Geode
-javaFunctions(toJavaPairRDD(rdd2)).saveToGeode("str_str_region");
+javaFunctions(toJavaPairRDD(rdd2)).saveToGemfire("str_str_region");
 ``` 
 
 ### Saving JavaRDD to Geode
@@ -100,7 +100,7 @@ data.add("ab");
 data.add("abc");
 JavaRDD<String> jrdd =  sc.parallelize(data);
     
-javaFunctions(rdd).saveToGeode("str_int_region", pairFunc);
+javaFunctions(rdd).saveToGemfire("str_int_region", pairFunc);
 ```
 
 ### Saving JavaPairDStream and JavaDStream
@@ -108,21 +108,21 @@ Saving JavaPairDStream and JavaDStream is similar to saving JavaPairRDD
 jand JavaRDD:
 ```
 JavaPairDStream<String, String> ds1 = ...
-javaFunctions(ds1).saveToGeode("str_str_region");
+javaFunctions(ds1).saveToGemFire("str_str_region");
 
 JavaDStream<String> ds2 = ...
-javaFunctions(ds2).saveToGeode("str_int_region", pairFunc);
+javaFunctions(ds2).saveToGemFire("str_int_region", pairFunc);
 ```
 
 ### Using Geode OQL
 
-There are two geodeOQL Java APIs, with and without GeodeConnectionConf.
-Here is an example without GeodeConnectionConf, it will use default 
-GeodeConnectionConf internally.
+There are two gemfireOQL Java APIs, with and without GemFireConnectionConf.
+Here is an example without GemFireConnectionConf, it will use default 
+GemFireConnectionConf internally.
 ```
 // assume there's jsc: JavaSparkContext
 SQLContext sqlContext = new org.apache.spark.sql.SQLContext(jsc);
-DataFrame df = javaFunctions(sqlContext).geodeOQL("select * from /str_str_region");
+DataFrame df = javaFunctions(sqlContext).gemfireOQL("select * from /str_str_region");
 df.show();
 ```
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java
new file mode 100644
index 0000000..fde6204
--- /dev/null
+++ b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.gemfire.spark.connector.internal;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.io.Serializable;
+
+/**
+ * This class contains all info required by GemFire RDD partitioner to create partitions.
+ */
+public class RegionMetadata implements Serializable {
+
+  private String  regionPath;
+  private boolean isPartitioned;
+  private int     totalBuckets;
+  private HashMap<ServerLocation, HashSet<Integer>> serverBucketMap;
+  private String  keyTypeName;
+  private String  valueTypeName;
+
+  /**
+   * Default constructor.
+   * @param regionPath the full path of the given region
+   * @param isPartitioned true for partitioned region, false otherwise
+   * @param totalBuckets number of total buckets for partitioned region, ignored otherwise
+   * @param serverBucketMap gemfire server (host:port pair) to bucket set map
+   * @param keyTypeName region key class name
+   * @param valueTypeName region value class name                    
+   */
+  public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap,
+                        String keyTypeName, String valueTypeName) {
+    this.regionPath = regionPath;
+    this.isPartitioned = isPartitioned;
+    this.totalBuckets = totalBuckets;
+    this.serverBucketMap = serverBucketMap;
+    this.keyTypeName = keyTypeName;
+    this.valueTypeName = valueTypeName;
+  }
+
+  public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap) {
+    this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null);
+  }
+
+  public String getRegionPath() {
+    return regionPath;
+  }
+
+  public boolean isPartitioned() {
+    return isPartitioned;
+  }
+
+  public int getTotalBuckets() {
+    return totalBuckets;
+  }
+  
+  public HashMap<ServerLocation, HashSet<Integer>> getServerBucketMap() {
+    return serverBucketMap;
+  }
+
+  public String getKeyTypeName() {
+    return keyTypeName;
+  }
+
+  public String getValueTypeName() {
+    return valueTypeName;
+  }
+
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("RegionMetadata(region=").append(regionPath)
+       .append("(").append(keyTypeName).append(", ").append(valueTypeName).append(")")
+       .append(", partitioned=").append(isPartitioned).append(", #buckets=").append(totalBuckets)
+       .append(", map=").append(serverBucketMap).append(")");
+    return buf.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java
new file mode 100644
index 0000000..862bc9f
--- /dev/null
+++ b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.execute.*;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+import com.gemstone.gemfire.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+import java.util.Iterator;
+
+public class QueryFunction implements Function {
+
+  private static final long serialVersionUID = 4866641340803692882L;
+
+  public final static String ID = "gemfire-spark-query-function";
+
+  private final static QueryFunction instance = new QueryFunction();
+
+  private static final Logger logger = LogService.getLogger();
+
+  private static final int CHUNK_SIZE = 1024;
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  public static QueryFunction getInstance() {
+    return instance;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    try {
+      String[] args = (String[]) context.getArguments();
+      String queryString = args[0];
+      String bucketSet = args[1];
+      InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
+      LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+      boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+      Query query = CacheFactory.getAnyInstance().getQueryService().newQuery(queryString);
+      Object result = partitioned ? query.execute((InternalRegionFunctionContext) context) : query.execute();
+      ResultSender<Object> sender = context.getResultSender();
+      HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null);
+      Iterator<Object> iter = ((SelectResults) result).asList().iterator();
+      while (iter.hasNext()) {
+        Object row = iter.next();
+        DataSerializer.writeObject(row, buf);
+        if (buf.size() > CHUNK_SIZE) {
+          sender.sendResult(buf.toByteArray());
+          logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " sendResult(), data size=" + buf.size());
+          buf.reset();
+        }
+      }
+      sender.lastResult(buf.toByteArray());
+      logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " lastResult(), data size=" + buf.size());
+      buf.reset();
+    }
+    catch(Exception e) {
+      throw new FunctionException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java
new file mode 100644
index 0000000..d5a69f3
--- /dev/null
+++ b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions;
+
+import java.util.Iterator;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.internal.cache.*;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+import com.gemstone.gemfire.internal.cache.execute.InternalResultSender;
+import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * GemFire function that is used by `SparkContext.gemfireRegion(regionPath, whereClause)`
+ * to retrieve region data set for the given bucket set as a RDD partition 
+ **/
+public class RetrieveRegionFunction implements Function {
+
+  public final static String ID = "spark-gemfire-retrieve-region";
+  private static final Logger logger = LogService.getLogger();
+  private static final RetrieveRegionFunction instance = new RetrieveRegionFunction();
+
+  public RetrieveRegionFunction() {
+  }
+
+  /** ------------------------------------------ */
+  /**     interface Function implementation      */
+  /** ------------------------------------------ */
+
+  public static RetrieveRegionFunction getInstance() {
+    return instance;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    String[] args = (String[]) context.getArguments();
+    String where = args[0];
+    String taskDesc = args[1];
+    InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
+    LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+    boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+    if (where.trim().isEmpty())
+      retrieveFullRegion(irfc, partitioned, taskDesc);
+    else
+      retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc);
+  }
+
+  /** ------------------------------------------ */
+  /**    Retrieve region data with where clause  */
+  /** ------------------------------------------ */
+
+  private void retrieveRegionWithWhereClause(
+    InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) {
+    String regionPath = localRegion.getFullPath();
+    String qstr = "select key, value from " + regionPath + ".entries where " + where;
+    logger.info(desc + ": " + qstr);
+
+    try {
+      Cache cache = CacheFactory.getAnyInstance();
+      QueryService queryService = cache.getQueryService();
+      Query query = queryService.newQuery(qstr);
+      SelectResults<Struct> results =
+        (SelectResults<Struct>) (partitioned ?  query.execute(context) : query.execute());
+
+      Iterator<Object[]> entries = getStructIteratorWrapper(results.asList().iterator());
+      InternalResultSender irs = (InternalResultSender) context.getResultSender();
+      StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
+      sender.send();
+    } catch (Exception e) {
+      throw new FunctionException(e);
+    }
+  }
+
+  private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> entries) {
+    return new WrapperIterator<Struct, Iterator<Struct>>(entries) {
+      @Override public Object[] next() {
+        return  delegate.next().getFieldValues();
+      }
+    };
+  }
+
+  /** ------------------------------------------ */
+  /**         Retrieve full region data          */
+  /** ------------------------------------------ */
+
+  private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) {
+    Iterator<Object[]> entries;
+    if (partitioned) {
+      PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>)
+              ((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator();
+      // entries = getPREntryIterator(iter);
+      entries = getSimpleEntryIterator(iter);
+    } else {
+      LocalRegion owner = (LocalRegion) context.getDataSet();
+      Iterator<Region.Entry> iter = (Iterator<Region.Entry>) owner.entrySet().iterator();
+      // entries = getRREntryIterator(iter, owner);
+      entries = getSimpleEntryIterator(iter);
+    }
+    InternalResultSender irs = (InternalResultSender) context.getResultSender();
+    StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
+    sender.send();
+  }
+
+//  /** An iterator for partitioned region that uses internal API to get serialized value */
+//  private Iterator<Object[]> getPREntryIterator(PREntriesIterator<Region.Entry> iterator) {
+//    return new WrapperIterator<Region.Entry, PREntriesIterator<Region.Entry>>(iterator) {
+//      @Override public Object[] next() {
+//        Region.Entry entry = delegate.next();
+//        int bucketId = delegate.getBucketId();
+//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId);
+//        // owner needs to be the bucket region not the enclosing partition region
+//        LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId);
+//        Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
+//        return new Object[] {keyInfo.getKey(), value};
+//      }
+//    };
+//  }
+//
+//  /** An iterator for replicated region that uses internal API to get serialized value */
+//  private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> iterator, LocalRegion region) {
+//    final LocalRegion owner = region;
+//    return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
+//      @Override public Object[] next() {
+//        Region.Entry entry =  delegate.next();
+//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null);
+//        Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
+//        return new Object[] {keyInfo.getKey(), value};
+//      }
+//    };
+//  }
+
+  // todo. compare performance of regular and simple iterator
+  /** An general iterator for both partitioned and replicated region that returns un-serialized value */
+  private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> iterator) {
+    return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
+      @Override public Object[] next() {
+        Region.Entry entry = delegate.next();
+        return new Object[] {entry.getKey(), entry.getValue()};
+      }
+    };
+  }
+
+  /** ------------------------------------------ */
+  /**        abstract wrapper iterator           */
+  /** ------------------------------------------ */
+
+  /** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */
+  abstract class WrapperIterator<T, S extends Iterator<T>> implements Iterator<Object[]> {
+
+    final S delegate;
+
+    protected WrapperIterator(S delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override public boolean hasNext() {
+      return delegate.hasNext();
+    }
+
+    @Override public void remove() { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java
new file mode 100644
index 0000000..de62cc1
--- /dev/null
+++ b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions;
+
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+import io.pivotal.gemfire.spark.connector.internal.RegionMetadata;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This GemFire function retrieve region metadata
+ */
+public class RetrieveRegionMetadataFunction implements Function {
+
+  public final static String ID = "gemfire-spark-retrieve-region-metadata";
+  
+  private static final RetrieveRegionMetadataFunction instance = new RetrieveRegionMetadataFunction();
+
+  public RetrieveRegionMetadataFunction() {
+  }
+
+  public static RetrieveRegionMetadataFunction getInstance() {
+    return instance;    
+  }
+  
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) context).getDataSet();
+    String regionPath = region.getFullPath();
+    boolean isPartitioned = region.getDataPolicy().withPartitioning();
+    String kTypeName = getTypeClassName(region.getAttributes().getKeyConstraint());
+    String vTypeName = getTypeClassName(region.getAttributes().getValueConstraint());
+
+    RegionMetadata metadata;
+    if (! isPartitioned) {
+      metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, vTypeName);
+    } else {
+      PartitionedRegion pregion = (PartitionedRegion) region;
+      int totalBuckets = pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets();
+      Map<Integer, List<BucketServerLocation66>> bucketMap = pregion.getRegionAdvisor().getAllClientBucketProfiles();
+      HashMap<ServerLocation, HashSet<Integer>> serverMap = bucketServerMap2ServerBucketSetMap(bucketMap);
+      metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, kTypeName, vTypeName);
+    }
+    
+    ResultSender<RegionMetadata> sender = context.getResultSender();
+    sender.lastResult(metadata);
+  }
+  
+  private String getTypeClassName(Class clazz) {
+    return clazz == null ? null : clazz.getCanonicalName();
+  }
+  
+  /** convert bucket to server map to server to bucket set map */
+  private HashMap<ServerLocation, HashSet<Integer>>
+    bucketServerMap2ServerBucketSetMap(Map<Integer, List<BucketServerLocation66>> map) {
+    HashMap<ServerLocation, HashSet<Integer>> serverBucketMap = new HashMap<>();
+    for (Integer id : map.keySet()) {
+      List<BucketServerLocation66> locations = map.get(id);
+      for (BucketServerLocation66 location : locations) {
+        ServerLocation server = new ServerLocation(location.getHostName(), location.getPort());
+        if (location.isPrimary()) {
+          HashSet<Integer> set = serverBucketMap.get(server);
+          if (set == null) {
+            set = new HashSet<>();
+            serverBucketMap.put(server, set);
+          }
+          set.add(id);
+          break;
+        }
+      }
+    }
+    return serverBucketMap;    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java
new file mode 100644
index 0000000..793de6a
--- /dev/null
+++ b/geode-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
+import com.gemstone.gemfire.cache.query.types.ObjectType;
+import com.gemstone.gemfire.cache.query.types.StructType;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * StructStreamingResultSender and StructStreamingResultCollector  are paired
+ * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct`
+ * from GemFire server to Spark Connector (the client of GemFire server)
+ * in streaming, i.e., while sender sending the result, the collector can
+ * start processing the arrived result without waiting for full result to
+ * become available.
+ */
+public class StructStreamingResultSender {
+
+  public static final byte TYPE_CHUNK   = 0x30;
+  public static final byte DATA_CHUNK   = 0x31;
+  public static final byte ERROR_CHUNK  = 0x32;
+  public static final byte SER_DATA     = 0x41;
+  public static final byte UNSER_DATA   = 0x42;
+  public static final byte BYTEARR_DATA = 0x43;
+
+  private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class);
+  public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField});
+
+  private static final Logger logger = LogService.getLogger();
+  private static final int CHUNK_SIZE = 4096;
+  
+  // Note: The type of ResultSender returned from GemFire FunctionContext is
+  //  always ResultSender<Object>, so can't use ResultSender<byte[]> here
+  private final ResultSender<Object> sender;
+  private final StructType structType;
+  private final Iterator<Object[]> rows;
+  private String desc;
+  private boolean closed = false;
+
+  /**
+   * the Constructor 
+   * @param sender the base ResultSender that send data in byte array
+   * @param type the StructType of result record
+   * @param rows the iterator of the collection of results
+   * @param desc description of this result (used for logging)           
+   */
+  public StructStreamingResultSender(
+    ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, String desc) {
+    if (sender == null || rows == null)
+      throw new NullPointerException("sender=" + sender + ", rows=" + rows);
+    this.sender = sender;
+    this.structType = type;
+    this.rows = rows;
+    this.desc = desc;
+  }
+
+  /** the Constructor with default `desc` */
+  public StructStreamingResultSender(
+          ResultSender<Object> sender, StructType type, Iterator<Object[]> rows) {
+    this(sender, type, rows, "StructStreamingResultSender");
+  }
+  
+  /**
+   * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR.
+   * TYPE chunk for sending struct type info, DATA chunk for sending data, and
+   * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted
+   * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually
+   * there are multiple DATA chunks. Each DATA chunk contains multiple rows
+   * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an
+   * exception is thrown, it is serialized and sent as the last chunk  of the 
+   * result (in the form of ERROR chunk).
+   */
+  public void send() {
+    if (closed) throw new RuntimeException("sender is closed.");
+
+    HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null);
+    String dataType = null;
+    int typeSize = 0;
+    int rowCount = 0;
+    int dataSize = 0;            
+    try {
+      if (rows.hasNext()) {
+        // Note: only send type info if there's data with it
+        typeSize = sendType(buf);
+        buf.writeByte(DATA_CHUNK);
+        int rowSize = structType == null ? 2 : structType.getFieldNames().length;
+        while (rows.hasNext()) {
+          rowCount ++;
+          Object[] row = rows.next();          
+          if (rowCount < 2) dataType = entryDataType(row);
+          if (rowSize != row.length) 
+            throw new IOException(rowToString("Expect "  + rowSize + " columns, but got ", row));
+          serializeRowToBuffer(row, buf);
+          if (buf.size() > CHUNK_SIZE) {
+            dataSize += sendBufferredData(buf, false);
+            buf.writeByte(DATA_CHUNK);
+          }
+        }
+      }
+      // send last piece of data or empty byte array
+      dataSize += sendBufferredData(buf, true);
+      logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" +
+                  typeSize + ", data.size=" + dataSize + ", row.avg.size=" +
+                  (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount)));
+    } catch (IOException | RuntimeException e) {
+      sendException(buf, e);
+    } finally {
+      closed = true;
+    }
+  }
+
+  private String rowToString(String rowDesc, Object[] row) {
+    StringBuilder buf = new StringBuilder();
+    buf.append(rowDesc).append("(");
+    for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]);
+    return buf.append(")") .toString();    
+  }
+
+  private String entryDataType(Object[] row) {
+    StringBuilder buf = new StringBuilder();
+    buf.append("(");
+    for (int i = 0; i < row.length; i++) {
+      if (i != 0) buf.append(", ");
+      buf.append(row[i].getClass().getCanonicalName());
+    }
+    return buf.append(")").toString();
+  }
+  
+  private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException {
+    for (Object data : row) {
+      if (data instanceof CachedDeserializable) {
+        buf.writeByte(SER_DATA);
+        DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf);
+      } else if (data instanceof byte[]) {
+        buf.writeByte(BYTEARR_DATA);
+        DataSerializer.writeByteArray((byte[]) data, buf);
+      } else {
+        buf.writeByte(UNSER_DATA);
+        DataSerializer.writeObject(data, buf);
+      }
+    }
+  }
+  
+  /** return the size of type data */
+  private int sendType(HeapDataOutputStream buf) throws IOException {
+    // logger.info(desc + " struct type: " + structType);
+    if (structType != null) {
+      buf.writeByte(TYPE_CHUNK);
+      DataSerializer.writeObject(structType, buf);
+      return sendBufferredData(buf, false);      
+    } else {
+      return 0;  // default KeyValue type, no type info send
+    }
+  }
+  
+  private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException {
+    if (isLast) sender.lastResult(buf.toByteArray());
+    else sender.sendResult(buf.toByteArray());
+    // logData(buf.toByteArray(), desc);
+    int s = buf.size();
+    buf.reset();
+    return s;    
+  }
+
+  /** Send the exception as the last chunk of the result. */
+  private void sendException(HeapDataOutputStream buf, Exception e) {
+    // Note: if exception happens during the serialization, the `buf` may contain
+    // partial serialized data, which may cause de-serialization hang or error.
+    // Therefore, always empty the buffer before sending the exception
+    if (buf.size() > 0) buf.reset();
+    
+    try {
+      buf.writeByte(ERROR_CHUNK);
+      DataSerializer.writeObject(e, buf);
+    } catch (IOException ioe) {
+      logger.error("StructStreamingResultSender failed to send the result:", e);
+      logger.error("StructStreamingResultSender failed to serialize the exception:", ioe);
+      buf.reset();
+    }
+    // Note: send empty chunk as the last result if serialization of exception 
+    // failed, and the error is logged on the GemFire server side.
+    sender.lastResult(buf.toByteArray());
+    // logData(buf.toByteArray(), desc);
+  }
+
+//  private void logData(byte[] data, String desc) {
+//    StringBuilder buf = new StringBuilder();
+//    buf.append(desc);
+//    for (byte b : data) {
+//      buf.append(" ").append(b);
+//    }
+//    logger.info(buf.toString());
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java
new file mode 100644
index 0000000..6cb571f
--- /dev/null
+++ b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.gemfire.spark.connector;
+
+import java.io.Serializable;
+
+public class Employee implements Serializable {
+
+  private String name;
+
+  private int age;
+
+  public Employee(String n, int a) {
+    name = n;
+    age = a;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public int getAge() {
+    return age;
+  }
+
+  public String toString() {
+    return new StringBuilder().append("Employee[name=").append(name).
+            append(", age=").append(age).
+            append("]").toString();
+  }
+  
+  public boolean equals(Object o) {
+    if (o instanceof Employee) {
+      return ((Employee) o).name.equals(name) && ((Employee) o).age == age;
+    }
+    return false;
+  }
+
+}
+


Mime
View raw message