Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 15948200CC6 for ; Tue, 13 Jun 2017 02:29:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1404D160BDE; Tue, 13 Jun 2017 00:29:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BCB5A160BEF for ; Tue, 13 Jun 2017 02:29:38 +0200 (CEST) Received: (qmail 17714 invoked by uid 500); 13 Jun 2017 00:29:37 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 17541 invoked by uid 99); 13 Jun 2017 00:29:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Jun 2017 00:29:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D6116E01C3; Tue, 13 Jun 2017 00:29:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.apache.org Date: Tue, 13 Jun 2017 00:29:41 -0000 Message-Id: <54ad38a90be54d37883cc7433877070d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/50] [abbrv] geode git commit: GEODE-194: Remove spark connector archived-at: Tue, 13 Jun 2017 00:29:41 -0000 GEODE-194: Remove spark connector Remove the spark connector code until it can be updated for the current spark release. We should also integrate the build lifecycle and consider how to extract this into a separate repo. This closes #558 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b27a79ae Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b27a79ae Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b27a79ae Branch: refs/heads/feature/GEODE-2558 Commit: b27a79ae91943a6ed1426f44dc4709a33eb671eb Parents: 0abaf49 Author: Anthony Baker Authored: Fri May 5 17:00:02 2017 -0700 Committer: Anthony Baker Committed: Wed Jun 7 14:20:31 2017 -0700 ---------------------------------------------------------------------- .gitignore | 2 - geode-assembly/build.gradle | 2 - geode-spark-connector/.gitignore | 1 - geode-spark-connector/README.md | 32 - geode-spark-connector/doc/10_demos.md | 84 --- geode-spark-connector/doc/1_building.md | 35 -- geode-spark-connector/doc/2_quick.md | 178 ------ geode-spark-connector/doc/3_connecting.md | 55 -- geode-spark-connector/doc/4_loading.md | 108 ---- geode-spark-connector/doc/5_rdd_join.md | 237 -------- geode-spark-connector/doc/6_save_rdd.md | 81 --- geode-spark-connector/doc/7_save_dstream.md | 68 --- geode-spark-connector/doc/8_oql.md | 58 -- geode-spark-connector/doc/9_java_api.md | 129 ---- .../connector/internal/RegionMetadata.java | 93 --- .../internal/geodefunctions/QueryFunction.java | 99 --- .../geodefunctions/RetrieveRegionFunction.java | 208 ------- .../RetrieveRegionMetadataFunction.java | 118 ---- .../StructStreamingResultSender.java | 219 ------- .../apache/geode/spark/connector/Employee.java | 54 -- .../spark/connector/JavaApiIntegrationTest.java | 424 ------------- .../apache/geode/spark/connector/Portfolio.java | 109 ---- .../apache/geode/spark/connector/Position.java | 73 --- .../src/it/resources/test-regions.xml | 49 -- .../src/it/resources/test-retrieve-regions.xml | 57 -- .../spark/connector/BasicIntegrationTest.scala | 598 ------------------ .../RDDJoinRegionIntegrationTest.scala | 300 --------- .../RetrieveRegionIntegrationTest.scala | 253 -------- .../apache/geode/spark/connector/package.scala | 29 - .../spark/connector/testkit/GeodeCluster.scala | 47 -- .../spark/connector/testkit/GeodeRunner.scala | 148 ----- .../geode/spark/connector/testkit/IOUtils.scala | 94 --- .../spark/streaming/ManualClockHelper.scala | 28 - .../spark/streaming/TestInputDStream.scala | 44 -- .../javaapi/GeodeJavaDStreamFunctions.java | 86 --- .../javaapi/GeodeJavaPairDStreamFunctions.java | 77 --- .../javaapi/GeodeJavaPairRDDFunctions.java | 238 -------- .../javaapi/GeodeJavaRDDFunctions.java | 178 ------ .../javaapi/GeodeJavaSQLContextFunctions.java | 49 -- .../javaapi/GeodeJavaSparkContextFunctions.java | 87 --- .../spark/connector/javaapi/GeodeJavaUtil.java | 122 ---- .../geode/spark/connector/GeodeConnection.scala | 67 --- .../spark/connector/GeodeConnectionConf.scala | 73 --- .../connector/GeodeConnectionManager.scala | 31 - .../spark/connector/GeodeFunctionDeployer.scala | 81 --- .../spark/connector/GeodeKryoRegistrator.scala | 29 - .../spark/connector/GeodePairRDDFunctions.scala | 140 ----- .../spark/connector/GeodeRDDFunctions.scala | 120 ---- .../connector/GeodeSQLContextFunctions.scala | 42 -- .../connector/GeodeSparkContextFunctions.scala | 39 -- .../internal/DefaultGeodeConnection.scala | 164 ----- .../DefaultGeodeConnectionManager.scala | 77 --- .../connector/internal/LocatorHelper.scala | 136 ----- .../StructStreamingResultCollector.scala | 152 ----- .../connector/internal/oql/QueryParser.scala | 58 -- .../spark/connector/internal/oql/QueryRDD.scala | 83 --- .../internal/oql/QueryResultCollector.scala | 69 --- .../connector/internal/oql/RDDConverter.scala | 40 -- .../connector/internal/oql/RowBuilder.scala | 38 -- .../connector/internal/oql/SchemaBuilder.scala | 73 --- .../internal/oql/UndefinedSerializer.scala | 46 -- .../connector/internal/rdd/GeodeJoinRDD.scala | 67 --- .../internal/rdd/GeodeOuterJoinRDD.scala | 69 --- .../internal/rdd/GeodeRDDPartition.scala | 36 -- .../internal/rdd/GeodeRDDPartitioner.scala | 59 -- .../internal/rdd/GeodeRDDPartitionerImpl.scala | 89 --- .../connector/internal/rdd/GeodeRDDWriter.scala | 82 --- .../connector/internal/rdd/GeodeRegionRDD.scala | 138 ----- .../connector/javaapi/GeodeJavaRegionRDD.scala | 26 - .../spark/connector/javaapi/JavaAPIHelper.scala | 53 -- .../apache/geode/spark/connector/package.scala | 69 --- .../streaming/GeodeDStreamFunctions.scala | 89 --- .../spark/connector/streaming/package.scala | 32 - .../geode/spark/connector/JavaAPITest.java | 163 ----- .../connector/GeodeFunctionDeployerTest.scala | 58 -- .../DefaultGeodeConnectionManagerTest.scala | 82 --- ...tStreamingResultSenderAndCollectorTest.scala | 254 -------- .../internal/oql/QueryParserTest.scala | 93 --- .../connector/ConnectorImplicitsTest.scala | 50 -- .../connector/GeodeConnectionConfTest.scala | 100 --- .../connector/GeodeDStreamFunctionsTest.scala | 79 --- .../spark/connector/GeodeRDDFunctionsTest.scala | 139 ----- .../spark/connector/LocatorHelperTest.scala | 168 ------ .../connector/rdd/GeodeRDDPartitionerTest.scala | 190 ------ .../connector/rdd/GeodeRegionRDDTest.scala | 117 ---- .../basic-demos/src/main/java/demo/Emp.java | 95 --- .../src/main/java/demo/OQLJavaDemo.java | 59 -- .../src/main/java/demo/PairRDDSaveJavaDemo.java | 86 --- .../src/main/java/demo/RDDSaveJavaDemo.java | 85 --- .../src/main/java/demo/RegionToRDDJavaDemo.java | 57 -- .../src/main/scala/demo/NetworkWordCount.scala | 75 --- .../project/Dependencies.scala | 45 -- .../project/GeodeSparkBuild.scala | 76 --- geode-spark-connector/project/Settings.scala | 58 -- geode-spark-connector/project/build.properties | 1 - geode-spark-connector/project/plugins.sbt | 8 - geode-spark-connector/sbt | 602 ------------------- geode-spark-connector/scalastyle-config.xml | 117 ---- gradle/rat.gradle | 7 - 99 files changed, 10182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index f83be48..6899907 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,4 @@ out/ *.dat *.rej *.orig -geode-spark-connector/**/target/ -geode-spark-connector/project/project/ geode-pulse/screenshots/ http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-assembly/build.gradle ---------------------------------------------------------------------- diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle index c308d30..39bb542 100755 --- a/geode-assembly/build.gradle +++ b/geode-assembly/build.gradle @@ -275,8 +275,6 @@ distributions { exclude '**/gradle/wrapper/gradle-wrapper.jar' exclude '**/.gradle' exclude '**/build/**' - exclude 'geode-spark-connector/**/target/**' - exclude 'geode-spark-connector/project/project' exclude '**/.project' exclude '**/.classpath' exclude '**/.settings/**' http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/.gitignore ---------------------------------------------------------------------- diff --git a/geode-spark-connector/.gitignore b/geode-spark-connector/.gitignore deleted file mode 100644 index ae3c172..0000000 --- a/geode-spark-connector/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/bin/ http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/README.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/README.md b/geode-spark-connector/README.md deleted file mode 100644 index d6e76e8..0000000 --- a/geode-spark-connector/README.md +++ /dev/null @@ -1,32 +0,0 @@ -#Spark Geode Connector - -Spark Geode Connector let's you connect Spark to Geode, expose Geode regions as Spark -RDDs, save Spark RDDs to Geode and execute Geode OQL queries in your Spark applications -and expose the results as DataFrames. - -##Features: - - - Expose Geode region as Spark RDD with Geode server-side filtering - - RDD join and outer join Geode region - - Save Spark RDD to Geode - - Save DStream to Geode - - Execute Geode OQL and return DataFrame - -##Version and Compatibility - -Spark Geode Connector supports Spark 1.3. - -##Documentation - - [Building and testing](doc/1_building.md) - - [Quick start](doc/2_quick.md) - - [Connect to Geode](doc/3_connecting.md) - - [Loading data from Geode](doc/4_loading.md) - - [RDD Join and Outer Join Geode Region](doc/5_rdd_join.md) - - [Saving RDD to Geode](doc/6_save_rdd.md) - - [Saving DStream to Geode](doc/7_save_dstream.md) - - [Geode OQL](doc/8_oql.md) - - [Using Connector in Java](doc/9_java_api.md) - - [About the demos](doc/10_demos.md) - -##License: Apache License 2.0 - http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/10_demos.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/10_demos.md b/geode-spark-connector/doc/10_demos.md deleted file mode 100644 index 4b4777a..0000000 --- a/geode-spark-connector/doc/10_demos.md +++ /dev/null @@ -1,84 +0,0 @@ -## About The Demos -The Spark Geode Connector contains basic demos, as samples, in both Scala -and Java. - - - Read Geode region to Spark as a RDD (`RegionToRDDJavaDemo.java`) - - Write Spark pair RDD to Geode (`PairRDDSaveJavaDemo.java`) - - Write Spark non-pair RDD to Geode (`RDDSaveJavaDemo.java`) - - Read OQL query result as Spark DataFrame (OQLJavaDemo.java) - - Network stateful word count (NetworkWordCount.scala) - -### Requirements -Running the demo requires a Geode Cluster. This can be a one -node or multi-node cluster. - -Here are the commands that start a two-node Geode cluster on localhost: -First set up environment variables: -``` -export JAVA_HOME= -export GEODE= -export CONNECTOR= -export CLASSPATH=$CLASSPATH:$GEODE/lib/locator-dependencies.jar:$GEODE/lib/server-dependencies.jar:$GEODE/lib/gfsh-dependencies.jar -export PATH=$PATH:$GEODE/bin -export GF_JAVA=$JAVA_HOME/bin/java - -Now run gfsh and execute the commands: -$ cd -$ mkdir locator server1 server2 -$ gfsh -gfsh> start locator --name=locator -gfsh> start server --name=server1 --server-port=40411 -gfsh> start server --name=server2 --server-port=40412 -``` - -In order to run the Demos, you need to create the following regions -via `gfsh`: -``` -gfsh> create region --name=str_str_region --type=REPLICATE --key-constraint=java.lang.String --value-constraint=java.lang.String -gfsh> create region --name=str_int_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.Integer -``` - -And deploy Geode functions required by the Spark Geode Connector: -``` -gfsh> deploy --jar=/geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar -``` - -### Run simple demos -This section describes how to run `RDDSaveJavaDemo.java`, -`PairRDDSaveJavaDemo.java` and `RegionToRDDJavaDemo.java`: -``` -export SPARK_CLASSPATH=$CONNECTOR/geode-spark-connector/target/scala-2.10/geode-spark-connector_2.10-0.5.0.jar:$GEODE/lib/server-dependencies.jar - -cd -bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port] - -bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port] - -bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port] -``` - -### Run stateful network word count -This demo shows how to save DStream to Geode. To run the demo, open 3 Terminals: - -**Terminal-1**, start net cat server: -``` -$ nc -lk 9999 -``` - -**Terminal-2**, start word count Spark app: -``` -bin/spark-submit --master=local[2] demo.NetworkWordCount $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port` -``` - -Switch to Terminal-1, type some words, and hit `enter` or `return` key, then check word count at **Terminal-3**, which has `gfsh` connected to the Geode cluster: -``` -gfsh> query --query="select key, value from /str_int_region.entrySet" -``` - -### Shutdown Geode cluster at the end -Use following command to shutdown the Geode cluster after playing with -the demos: -``` -gfsh> shutdown --include-locators=true -``` - http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/1_building.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/1_building.md b/geode-spark-connector/doc/1_building.md deleted file mode 100644 index 74ce034..0000000 --- a/geode-spark-connector/doc/1_building.md +++ /dev/null @@ -1,35 +0,0 @@ -## Building and Testing - -The spark connector is built with Scala 2.10 and sbt 0.13.5 to 0.13.7. - -### Building Artifacts - -To build against Apache Geode, you need to build Geode first and publish the jars -to local repository. In the root of Geode directory, run: - -``` -./gradlew clean build install -Dskip.tests=true -``` - -In the root directory of connector project, run: -``` -./sbt clean package -``` - -The following jar files will be created: - - `geode-spark-connector/target/scala-2.10/geode-spark-connector_2.10-0.5.0.jar` - - `geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar` - - `geode-spark-demos/target/scala-2.10/geode-spark-demos_2.10-0.5.0.jar ` - -### Testing -Commands to run unit and integration tests: -``` -./sbt test // unit tests -./sbt it:test // integration tests -``` - -Integration tests start a Geode cluster and Spark in local mode. -Please make sure you've done following before you run `./sbt it:test`: - - run`./sbt package` - -Next: [Quick Start](2_quick.md) http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/2_quick.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/2_quick.md b/geode-spark-connector/doc/2_quick.md deleted file mode 100644 index 8c2fe4c..0000000 --- a/geode-spark-connector/doc/2_quick.md +++ /dev/null @@ -1,178 +0,0 @@ -## 5 Minutes Quick Start Guide - -In this quick start guide, you will learn how to use Spark shell to test Spark -Geode Connector functionalities. - -### Prerequisites - -Before you start, you should have basic knowledge of Geode and Spark. -Please refer to [Geode Documentation](http://geode.apache.org/docs/) -and [Spark Documentation](https://spark.apache.org/docs/latest/index.html) for -details. If you are new to Geode, this -[Quick Start Guide](http://geode.apache.org/docs/guide/getting_started/15_minute_quickstart_gfsh.html) -is a good starting point. - -You need 2 terminals to follow along, one for Geode shell `gfsh`, and one for Spark shell. Set up Jdk 1.7 on both of them. - -### Geode `gfsh` terminal -In this terminal, start Geode cluster, deploy Spark Geode Connector's geode-function jar, and create demo regions. - -Set up environment variables: -``` -export JAVA_HOME= -export GEODE= -export CONNECTOR= -export CLASSPATH=$CLASSPATH:$GEODE/lib/locator-dependencies.jar:$GEODE/lib/server-dependencies.jar:$GEODE/lib/gfsh-dependencies.jar -export PATH=$PATH:$GEODE/bin -export GF_JAVA=$JAVA_HOME/bin/java -``` - -Start Geode cluster with 1 locator and 2 servers: -``` -gfsh -gfsh>start locator --name=locator1 --port=55221 -gfsh>start server --name=server1 --locators=localhost[55221] --server-port=0 -gfsh>start server --name=server2 --locators=localhost[55221] --server-port=0 -``` - -Then create two demo regions: -``` -gfsh>create region --name=str_str_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.String -gfsh>create region --name=int_str_region --type=PARTITION --key-constraint=java.lang.Integer --value-constraint=java.lang.String -``` - -Deploy Spark Geode Connector's geode-function jar (`geode-functions_2.10-0.5.0.jar`): -``` -gfsh>deploy --jar=/geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar -``` - -### Spark shell terminal -In this terminal, setup Spark environment, and start Spark shell. - -Set Geode locator property in Spark configuration: add -following to `/conf/spark-defaults.conf`: -``` -spark.geode.locators=localhost[55221] -``` -Note: - - if the file doesn't exist, create one. - - replace string `localhost[55221]` with your own locator host and port. - -By default, Spark shell output lots of info log, if you want to -turn off info log, change `log4j.rootCategory` to `WARN, console` -in file `/conf/conf/log4j.properties`: -``` -log4j.rootCategory=WARN, console -``` -if file `log4j.properties` doesn't exist, copy `log4j.properties.template` -under the same directory to `log4j.properties` and update the file. - -Start spark-shell: -``` -bin/spark-shell --master local[*] --jars $CONNECTOR/geode-spark-connector/target/scala-2.10/geode-spark-connector_2.10-0.5.0.jar,$GEODE/lib/server-dependencies.jar -``` - -Check Geode locator property in the Spark shell: -``` -scala> sc.getConf.get("spark.geode.locators") -res0: String = localhost[55221] -``` - -In order to enable Geode specific functions, you need to import -`org.apache.geode.spark.connector._` -``` -scala> import org.apache.geode.spark.connector._ -import org.apache.geode.spark.connector._ -``` - -### Save Pair RDD to Geode -In the Spark shell, create a simple pair RDD and save it to Geode: -``` -scala> val data = Array(("1", "one"), ("2", "two"), ("3", "three")) -data: Array[(String, String)] = Array((1,one), (2,two), (3,three)) - -scala> val distData = sc.parallelize(data) -distData: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at :14 - -scala> distData.saveToGemfire("str_str_region") -15/02/17 07:11:54 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:29, took 0.341288 s -``` - -Verify the data is saved in Geode using `gfsh`: -``` -gfsh>query --query="select key,value from /str_str_region.entries" - -Result : true -startCount : 0 -endCount : 20 -Rows : 3 - -key | value ---- | ----- -1 | one -3 | three -2 | two - -NEXT_STEP_NAME : END -``` - -### Save Non-Pair RDD to Geode -Saving non-pair RDD to Geode requires an extra function that converts each -element of RDD to a key-value pair. Here's sample session in Spark shell: -``` -scala> val data2 = Array("a","ab","abc") -data2: Array[String] = Array(a, ab, abc) - -scala> val distData2 = sc.parallelize(data2) -distData2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at :17 - -scala> distData2.saveToGemfire("int_str_region", e => (e.length, e)) -[info 2015/02/17 12:43:21.174 PST
tid=0x1] -... -15/02/17 12:43:21 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:52, took 0.251194 s -``` - -Verify the result with `gfsh`: -``` -gfsh>query --query="select key,value from /int_str_region.entrySet" - -Result : true -startCount : 0 -endCount : 20 -Rows : 3 - -key | value ---- | ----- -2 | ab -3 | abc -1 | a - -NEXT_STEP_NAME : END -``` - -### Expose Geode Region As RDD -The same API is used to expose both replicated and partitioned region as RDDs. -``` -scala> val rdd = sc.geodeRegion[String, String]("str_str_region") -rdd: org.apache.geode.spark.connector.rdd.GemFireRDD[String,String] = GemFireRDD[2] at RDD at GemFireRDD.scala:19 - -scala> rdd.foreach(println) -(1,one) -(3,three) -(2,two) - -scala> val rdd2 = sc.geodeRegion[Int, String]("int_str_region") -rdd2: org.apache.geode.spark.connector.rdd.GemFireRDD[Int,String] = GemFireRDD[3] at RDD at GemFireRDD.scala:19 - -scala> rdd2.foreach(println) -(2,ab) -(1,a) -(3,abc) -``` -Note: use the right type of region key and value, otherwise you'll get -ClassCastException. - - -Next: [Connecting to Geode](3_connecting.md) - - http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/3_connecting.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/3_connecting.md b/geode-spark-connector/doc/3_connecting.md deleted file mode 100644 index d2d83aa..0000000 --- a/geode-spark-connector/doc/3_connecting.md +++ /dev/null @@ -1,55 +0,0 @@ -## Connecting to Geode - -There are two ways to connect Spark to Geode: - - Specify Geode connection properties via `SparkConf`. - - Specify Geode connection properties via `GeodeConnectionConf`. - -### Specify Geode connection properties via `SparkConf` -The only required Geode connection property is `spark.geode.locators`. -This can be specified in `/conf/spark-defaults.conf` or in Spark -application code. In the following examples, we assume you want to provide -3 extra properties: `security-client-auth-init`, `security-username`, and -`security-password`, note that they are prefixed with `spark.geode.`. - -In `/conf/spark-defaults.com` -``` -spark.geode.locators=192.168.1.47[10334] -spark.geode.security-client-auth-init=org.apache.geode.security.templates.UserPasswordAuthInit.create -spark.geode.security-username=scott -spark.geode.security-password=tiger -``` - -Or in the Spark application code: -``` -import org.apache.geode.spark.connector._ -val sparkConf = new SparkConf() - .set(GeodeLocatorPropKey, "192.168.1.47[10334]") - .set("spark.geode.security-client-auth-init", "org.apache.geode.security.templates.UserPasswordAuthInit.create") - .set("spark.geode.security-username", "scott") - .set("spark.geode.security-password", "tiger") -``` - -After this, you can use all connector APIs without providing `GeodeConnectionConf`. - -### Specify Geode connection properties via `GeodeConnectionConf` -Here's the code that creates `GeodeConnectionConf` with the same set of -properties as the examples above: -``` -val props = Map("security-client-auth-init" -> "org.apache.geode.security.templates.UserPasswordAuthInit.create", - "security-username" -> "scott", - "security-password" -> "tiger") -val connConf = GeodeConnectionConf("192.168.1.47[10334]", props) -``` - -Please note that those properties are **not** prefixed with `spark.geode.`. - -After this, you can use all connector APIs that require `GeodeConnectionConf`. - -### Notes about locators - - You can specify locator in two formats: `host[port]` or `host:port`. For - example `192.168.1.47[10334]` or `192.168.1.47:10334` - - If your Geode cluster has multiple locators, list them all and separated - by `,`. For example: `host1:10334,host2:10334`. - - -Next: [Loading Data from Geode](4_loading.md) http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/4_loading.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/4_loading.md b/geode-spark-connector/doc/4_loading.md deleted file mode 100644 index fffe6a3..0000000 --- a/geode-spark-connector/doc/4_loading.md +++ /dev/null @@ -1,108 +0,0 @@ -## Loading Data from Geode - -To expose full data set of a Geode region as a Spark -RDD, call `geodeRegion` method on the SparkContext object. - -``` -val rdd = sc.geodeRegion("region path") -``` - -Or with specific `GeodeConectionConf` object instance (see -[Connecting to Geode](3_connecting.md) for how to create GeodeConectionConf): -``` -val rdd = sc.geodeRegion("region path", connConf) -``` - -## Geode RDD Partitions - -Geode has two region types: **replicated**, and -**partitioned** region. Replicated region has full dataset on -each server, while partitioned region has its dataset spanning -upon multiple servers, and may have duplicates for high -availability. - -Since replicated region has its full dataset available on every -server, there is only one RDD partition for a `GeodeRegionRDD` that -represents a replicated region. - -For a `GeodeRegionRDD` that represents a partitioned region, there are -many potential ways to create RDD partitions. So far, we have -implemented ServerSplitsPartitioner, which will split the bucket set -on each Geode server into two RDD partitions by default. -The number of splits is configurable, the following shows how to set -three partitions per Geode server: -``` -import org.apache.geode.spark.connector._ - -val opConf = Map(PreferredPartitionerPropKey -> ServerSplitsPartitionerName, - NumberPartitionsPerServerPropKey -> "3") - -val rdd1 = sc.geodeRegion[String, Int]("str_int_region", opConf = opConf) -// or -val rdd2 = sc.geodeRegion[String, Int]("str_int_region", connConf, opConf) -``` - - -## Geode Server-Side Filtering -Server-side filtering allow exposing partial dataset of a Geode region -as a RDD, this reduces the amount of data transferred from Geode to -Spark to speed up processing. -``` -val rdd = sc.geodeRegion("").where("") -``` - -The above call is translated to OQL query `select key, value from /.entries where `, then -the query is executed for each RDD partition. Note: the RDD partitions are created the same way as described in the -section above. - -In the following demo, javabean class `Emp` is used, it has 5 attributes: `id`, `lname`, `fname`, `age`, and `loc`. -In order to make `Emp` class available on Geode servers, we need to deploy a jar file that contains `Emp` class, -now build the `emp.jar`, deploy it and create region `emps` in `gfsh`: -``` -zip $CONNECTOR/geode-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar \ - -i "demo/Emp.class" --out $CONNECTOR/emp.jar - -gfsh -gfsh> deploy --jar=/emp.jar -gfsh> create region --name=emps --type=PARTITION -``` -Note: The `Emp.class` is availble in `basic-demos_2.10-0.5.0.jar`. But that jar file depends on many scala and spark -classes that are not available on Geode servers' classpath. So use the above `zip` command to create a jar file that -only contains `Emp.class`. - -Now in Spark shell, generate some random `Emp` records, and save them to region `emps` (remember to add `emp.jar` to -Spark shell classpath before starting Spark shell): -``` -import org.apache.geode.spark.connector._ -import scala.util.Random -import demo.Emp - -val lnames = List("Smith", "Johnson", "Jones", "Miller", "Wilson", "Taylor", "Thomas", "Lee", "Green", "Parker", "Powell") -val fnames = List("John", "James", "Robert", "Paul", "George", "Kevin", "Jason", "Jerry", "Peter", "Joe", "Alice", "Sophia", "Emma", "Emily") -val locs = List("CA", "WA", "OR", "NY", "FL") -def rpick(xs: List[String]): String = xs(Random.nextInt(xs.size)) - -val d1 = (1 to 20).map(x => new Emp(x, rpick(lnames), rpick(fnames), 20+Random.nextInt(41), rpick(locs))).toArray -val rdd1 = sc.parallelize(d1) -rdd1.saveToGeode("emps", e => (e.getId, e)) -``` - -Now create a RDD that contains all employees whose age is less than 40, and display its contents: -``` -val rdd1s = sc.geodeRegion("emps").where("value.getAge() < 40") - -rdd1s.foreach(println) -(5,Emp(5, Taylor, Robert, 32, FL)) -(14,Emp(14, Smith, Jason, 28, FL)) -(7,Emp(7, Jones, Robert, 26, WA)) -(17,Emp(17, Parker, John, 20, WA)) -(2,Emp(2, Thomas, Emily, 22, WA)) -(10,Emp(10, Lee, Alice, 31, OR)) -(4,Emp(4, Wilson, James, 37, CA)) -(15,Emp(15, Powell, Jason, 34, NY)) -(3,Emp(3, Lee, Sophia, 32, OR)) -(9,Emp(9, Johnson, Sophia, 25, OR)) -(6,Emp(6, Miller, Jerry, 30, NY)) -``` - -Next: [RDD Join and Outer Join Geode Region](5_rdd_join.md) http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/5_rdd_join.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/5_rdd_join.md b/geode-spark-connector/doc/5_rdd_join.md deleted file mode 100644 index 5c0d6c8..0000000 --- a/geode-spark-connector/doc/5_rdd_join.md +++ /dev/null @@ -1,237 +0,0 @@ -## RDD Join and Outer Join Geode Region - -The Spark Geode Connector suports using any RDD as a source -of a join and outer join with a Geode region through APIs -`joinGeodeRegion[K, V]` and `outerJoinGeodeRegion[K, V]`. -Those two APIs execute a single `region.getAll` call for every -partition of the source RDD, so no unnecessary data will be requested -or transferred. This means a join or outer join between any RDD and -a Geode region can be performed without full region scan, and the -source RDD's partitioning and placement for data locality are used. - -Please note that the two type parameters `[K, V]` are the type -of key/value pair of region entries, they need to be specified -to make result RDD has correct type. - -The region `emps` that is created and populated in -[Geode Server-Side Filtering](4_loading.md) will be used in the -following examples. - -### RDD[(K, V1)] join and outer join Region[K, V2] - -In this case, the source RDD is a pair RDD, and it has the same key -type as the Region. Use API `rdd.joinGeodeRegion[K, V2](regionPath)` and -`rdd.outerJoinGeodeRegion[K, V2](regionPath)` do the join and outer -join. - -Prepare a source RDD `rdd2`: -``` -val d2 = (11 to 25).map(x => (x, s"message-$x")).toArray -val rdd2 = sc.parallelize(d2) -// print rdd2's content -rdd2.foreach(println) -(11,message-11) -(12,message-12) -(13,message-13) -(14,message-14) -(15,message-15) -(16,message-16) -(17,message-17) -(18,message-18) -(19,message-19) -(20,message-20) -(21,message-21) -(22,message-22) -(23,message-23) -(24,message-24) -(25,message-25) -``` - -Join RDD `rdd2` with region `emps`, and print out the result: -``` -val rdd2j = rdd2.joinGeodeRegion[Int, Emp]("emps") - -rdd2j.foreach(println) -((11,message-11),Emp(11, Taylor, Emma, 44, CA)) -((12,message-12),Emp(12, Taylor, Joe, 60, FL)) -((13,message-13),Emp(13, Lee, Kevin, 50, FL)) -((14,message-14),Emp(14, Smith, Jason, 28, FL)) -((15,message-15),Emp(15, Powell, Jason, 34, NY)) -((16,message-16),Emp(16, Thomas, Alice, 42, OR)) -((17,message-17),Emp(17, Parker, John, 20, WA)) -((18,message-18),Emp(18, Powell, Alice, 58, FL)) -((19,message-19),Emp(19, Taylor, Peter, 46, FL)) -((20,message-20),Emp(20, Green, Peter, 57, CA)) -``` -Note that there's no pairs in the result RDD `rdd2j` corresponding to -the pairs with id from 21 to 25 in RDD `rdd2` since there's no region -entries have those key values. - -Outer join RDD `rdd2` with region `emps`, and print out the result: -``` -val rdd2o = rdd2.outerJoinGeodeRegion[Int, Emp]("emps") - -rdd2o.foreach(println) -((18,message-18),Some(Emp(18, Powell, Alice, 58, FL))) -((19,message-19),Some(Emp(19, Taylor, Peter, 46, FL))) -((11,message-11),Some(Emp(11, Taylor, Emma, 44, CA))) -((12,message-12),Some(Emp(12, Taylor, Joe, 60, FL))) -((20,message-20),Some(Emp(20, Green, Peter, 57, CA))) -((21,message-21),None) -((22,message-22),None) -((23,message-23),None) -((24,message-24),None) -((25,message-25),None) -((13,message-13),Some(Emp(13, Lee, Kevin, 50, FL))) -((14,message-14),Some(Emp(14, Smith, Jason, 28, FL))) -((15,message-15),Some(Emp(15, Powell, Jason, 34, NY))) -((16,message-16),Some(Emp(16, Thomas, Alice, 42, OR))) -((17,message-17),Some(Emp(17, Parker, John, 20, WA))) -``` -Note that there are pairs in the result RDD `rdd2o` corresponding to -the pairs with id from 21 to 25 in the RDD `rdd2`, and values are `None` -since there's no region entries have those key values. - -### RDD[(K1, V1)] join and outer join Region[K2, V2] - -In this case, the source RDD is still a pair RDD, but it has different -key type. Use API `rdd.joinGeodeRegion[K2, V2](regionPath, func)` and -`rdd.outerJoinGeodeRegion[K2, V2](regionPath, func)` do the join and -outer join, where `func` is the function to generate key from (k, v) -pair, the element of source RDD, to join with Geode region. - -Prepare a source RDD `d3`: -``` -val d3 = (11 to 25).map(x => (s"message-$x", x)).toArray -val rdd3 = sc.parallelize(d3) -// print rdd3's content -rdd3.foreach(println) -(message-18,18) -(message-19,19) -(message-11,11) -(message-20,20) -(message-21,21) -(message-22,22) -(message-12,12) -(message-23,23) -(message-24,24) -(message-25,25) -(message-13,13) -(message-14,14) -(message-15,15) -(message-16,16) -(message-17,17) -``` - -Join RDD `rdd3` (RDD[(String, Int)] with region `emps` (Region[Int, Emp]), and print out the result: -``` -val rdd3j = rdd3.joinGeodeRegion[Int, Emp]("emps", pair => pair._2) - -rdd3j.foreach(println) -((message-18,18),Emp(18, Powell, Alice, 58, FL)) -((message-19,19),Emp(19, Taylor, Peter, 46, FL)) -((message-20,20),Emp(20, Green, Peter, 57, CA)) -((message-11,11),Emp(11, Taylor, Emma, 44, CA)) -((message-12,12),Emp(12, Taylor, Joe, 60, FL)) -((message-13,13),Emp(13, Lee, Kevin, 50, FL)) -((message-14,14),Emp(14, Smith, Jason, 28, FL)) -((message-15,15),Emp(15, Powell, Jason, 34, NY)) -((message-16,16),Emp(16, Thomas, Alice, 42, OR)) -((message-17,17),Emp(17, Parker, John, 20, WA)) -``` -Note `pair => pair._2` means use the 2nd element of the element of source -RDD and join key. - -Outer join RDD `rdd3` with region `emps`, and print out the result: -``` -val rdd3o = rdd3.outerJoinGeodeRegion[Int, Emp]("emps", pair => pair._2) - -rdd3o.foreach(println) -((message-18,18),Some(Emp(18, Powell, Alice, 58, FL))) -((message-11,11),Some(Emp(11, Taylor, Emma, 44, CA))) -((message-19,19),Some(Emp(19, Taylor, Peter, 46, FL))) -((message-12,12),Some(Emp(12, Taylor, Joe, 60, FL))) -((message-20,20),Some(Emp(20, Green, Peter, 57, CA))) -((message-13,13),Some(Emp(13, Lee, Kevin, 50, FL))) -((message-21,21),None) -((message-14,14),Some(Emp(14, Smith, Jason, 28, FL))) -((message-22,22),None) -((message-23,23),None) -((message-24,24),None) -((message-25,25),None) -((message-15,15),Some(Emp(15, Powell, Jason, 34, NY))) -((message-16,16),Some(Emp(16, Thomas, Alice, 42, OR))) -((message-17,17),Some(Emp(17, Parker, John, 20, WA))) -``` - -### RDD[T] join and outer join Region[K, V] - -Use API `rdd.joinGeodeRegion[K, V](regionPath, func)` and -`rdd.outerJoinGeodeRegion[K, V](regionPath, func)` do the join -and outer join, where `func` is the function to generate key from -`t`, the element of source RDD, to join with Geode region. - -Prepare a source RDD `d4`: -``` -val d4 = (11 to 25).map(x => x * 2).toArray -val rdd4 = sc.parallelize(d4) -// print rdd4's content -rdd4.foreach(println) -22 -24 -36 -38 -40 -42 -44 -46 -26 -28 -48 -30 -32 -34 -50 -``` - -Join RDD `d4` with region `emps`, and print out the result: -``` -val rdd4j = rdd4.joinGeodeRegion[Int, Emp]("emps", x => x/2) - -rdd4j.foreach(println) -(22,Emp(11, Taylor, Emma, 44, CA)) -(24,Emp(12, Taylor, Joe, 60, FL)) -(26,Emp(13, Lee, Kevin, 50, FL)) -(28,Emp(14, Smith, Jason, 28, FL)) -(30,Emp(15, Powell, Jason, 34, NY)) -(32,Emp(16, Thomas, Alice, 42, OR)) -(34,Emp(17, Parker, John, 20, WA)) -(36,Emp(18, Powell, Alice, 58, FL)) -(38,Emp(19, Taylor, Peter, 46, FL)) -(40,Emp(20, Green, Peter, 57, CA)) -``` - -Outer join RDD `d4` with region `emps`, and print out the result: -``` -val rdd4o = rdd4.outerJoinGeodeRegion[Int, Emp]("emps", x => x/2) - -rdd4o.foreach(println) -(36,Some(Emp(18, Powell, Alice, 58, FL))) -(38,Some(Emp(19, Taylor, Peter, 46, FL))) -(40,Some(Emp(20, Green, Peter, 57, CA))) -(42,None) -(44,None) -(46,None) -(48,None) -(50,None) -(22,Some(Emp(11, Taylor, Emma, 44, CA))) -(24,Some(Emp(12, Taylor, Joe, 60, FL))) -(26,Some(Emp(13, Lee, Kevin, 50, FL))) -(28,Some(Emp(14, Smith, Jason, 28, FL))) -(30,Some(Emp(15, Powell, Jason, 34, NY))) -(32,Some(Emp(16, Thomas, Alice, 42, OR))) -(34,Some(Emp(17, Parker, John, 20, WA))) -``` - - -Next: [Saving RDD to Geode](6_save_rdd.md) http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/6_save_rdd.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/6_save_rdd.md b/geode-spark-connector/doc/6_save_rdd.md deleted file mode 100644 index 5adc028..0000000 --- a/geode-spark-connector/doc/6_save_rdd.md +++ /dev/null @@ -1,81 +0,0 @@ -## Saving RDD to Geode - -It is possible to save any RDD to a Geode region. The requirements are: - - the object class of the elements contained by the RDD is - (1) available on the classpath of Geode servers - (2) and serializable. - - the target region exists. - -To save an RDD to an existing Geode region, import -`org.apache.geode.spark.connector._` and call the `saveToGeode` -method on RDD. - -### Save RDD[(K, V)] to Geode -For pair RDD, i.e., RDD[(K, V)], the pair is treated as key/value pair. -``` -val data = Array(("1","one"),("2","two"),("3","three")) -val rdd = sc.parallelize(data) -rdd.saveToGeode("str_str_region") -``` - -If you create GeodeConnectionConf as described in -[Connecting to Geode](3_connecting.md), the last statement becomes: -``` -rdd.saveToGeode("str_str_region", connConf) -``` - -You can verify the region contents: -``` -gfsh>query --query="select key, value from /str_str_region.entrySet" - -Result : true -startCount : 0 -endCount : 20 -Rows : 3 - -key | value ---- | ----- -1 | one -3 | three -2 | two -``` - -Note that Geode regions require unique keys, so if the pair RDD -contains duplicated keys, those pairs with the same key are overwriting -each other, and only one of them appears in the final dataset. - -### Save RDD[T] to Geode -To save non-pair RDD to Geode, a function (`f: T => K`) that creates keys -from elements of RDD, and is used to convert RDD element `T` to pair `(f(T), T)`, -then the pair is save to Geode. - -``` -val data2 = Array("a","ab","abc") -val rdd2 = sc.parallelize(data2) -rdd2.saveToGeode("str_int_region", e => (e, e.length)) -// or use GeodeConnectionConf object directly -// rdd2.saveToGeode("rgnb", e => (e, e.length), connConf) -``` - -### `rdd.save.batch.size` - -The connector invokes Geode API `putAll()` to save the data. To make -`putAll()` more efficient, the connector invokes putAll() for every -10,000 entries by default. This batch size can be changed with optional -parameter `opConf`. The following shows how to do it: - -``` - // in Scala - rdd.saveToGeode(regionPath, opConf = Map(RDDSaveBatchSizePropKey -> "5000")) - - // in Java - Properties opConf = new Properties(); - opConf.put(RDDSaveBatchSizePropKey, "5000"); - ... - javaFunctions(rdd).saveToGeode(regionPath, opConf); - - // note: RDDSaveBatchSizePropKey = "rdd.save.batch.size" -``` - - -Next: [Saving DStream to Geode](7_save_dstream.md) http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/7_save_dstream.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/7_save_dstream.md b/geode-spark-connector/doc/7_save_dstream.md deleted file mode 100644 index 6cd2c66..0000000 --- a/geode-spark-connector/doc/7_save_dstream.md +++ /dev/null @@ -1,68 +0,0 @@ -## Saving DStream to Geode -Spark Streaming extends the core API to allow high-throughput, fault-tolerant -stream processing of live data streams. Data can be ingested from many -sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. -Results can be stored in Geode. - -### A Simple Spark Streaming App: Stateful Network Word Count - -Create a `StreamingContext` with a `SparkConf` configuration -``` -val ssc = new StreamingContext(sparkConf, Seconds(1)) -``` - -Create a DStream that will connect to net cat server `host:port` -``` -val lines = ssc.socketTextStream(host, port) -``` - -Count each word in each batch -``` -val words = lines.flatMap(_.split(" ")) -val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) -``` - -Use `updateStateByKey` to maintain a running count of each word seen in a text -data stream. Here, the running count is the state and it is an integer. We -define the update function as -``` -val updateFunc = (values: Seq[Int], state: Option[Int]) => { - val currentCount = values.foldLeft(0)(_ + _) - val previousCount = state.getOrElse(0) - Some(currentCount + previousCount) -} -``` - -This is applied on a DStream containing words (say, the pairs DStream containing -`(word, 3)` pairs in the earlier example -``` -val runningCounts = wordCounts.updateStateByKey[Int](updateFunction _) -``` - -Print a few of the counts to the console. Start the computation. -``` -runningCounts.print() -ssc.start() -ssc.awaitTermination() // Wait for the computation to terminate -``` - -#### Spark Streaming With Geode -Now let's save the running word count to Geode region `str_int_region`, which -simply replace print() with saveToGeode(): - -``` -import org.apache.geode.spark.connector.streaming._ -runningCounts.saveToGeode("str_int_region") -``` - -You can use the version of saveToGeode that has the parameter `GeodeConnectionConf`: -``` -runningCounts.saveToGeode("str_int_region", connConf) -``` - -See [Spark Streaming Programming Guide] -(http://spark.apache.org/docs/latest/streaming-programming-guide.html) for -more details about Sarpk streaming programming. - - -Next: [Geode OQL](8_oql.md) http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/8_oql.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/8_oql.md b/geode-spark-connector/doc/8_oql.md deleted file mode 100644 index 88456e5..0000000 --- a/geode-spark-connector/doc/8_oql.md +++ /dev/null @@ -1,58 +0,0 @@ -## Geode OQL Query -Spark Geode Connector lets us run Geode OQL queries in Spark applications -to retrieve data from Geode. The query result is a Spark DataFrame. Note -that as of Spark 1.3, SchemaRDD is deprecated. Spark Geode Connector does -not support SchemaRDD. - -An instance of `SQLContext` is required to run OQL query. -``` -val sqlContext = new org.apache.spark.sql.SQLContext(sc) -``` - -Create a `DataFrame` using OQL: -``` -val dataFrame = sqlContext.geodeOQL("SELECT * FROM /CustomerRegion WHERE status = 'active'") -``` - -You can repartition the `DataFrame` using `DataFrame.repartition()` if needed. -Once you have the `DataFrame`, you can register it as a table and use Spark -SQL to query it: -``` -dataFrame.registerTempTable("customer") -val SQLResult = sqlContext.sql("SELECT * FROM customer WHERE id > 100") -``` - -##Serialization -If the OQL query involves User Defined Type (UDT), and the default Java -serializer is used, then the UDT on Geode must implement `java.io.Serializable`. - -If KryoSerializer is preferred, as described in [Spark Documentation] -(https://spark.apache.org/docs/latest/tuning.html), you can configure -`SparkConf` as the following example: -``` -val conf = new SparkConf() - .setAppName("MySparkApp") - .setMaster("local[*]") - .set(GeodeLocatorPropKey, "localhost[55221]") - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.geode.spark.connector.GeodeKryoRegistrator") -``` - -and register the classes (optional) -``` -conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) -``` - -Use the following options to start Spark shell: -``` - --conf spark.serializer=org.apache.spark.serializer.KryoSerializer - --conf spark.kryo.registrator=org.apache.geode.spark.connector.GeodeKryoRegistrator -``` - -## References -[Geode OQL Documentation](http://geode-docs.cfapps.io/docs/developing/querying_basics/chapter_overview.html) - -[Spark SQL Documentation](https://spark.apache.org/docs/latest/sql-programming-guide.html) - - -Next: [Using Connector in Java](9_java_api.md) http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/doc/9_java_api.md ---------------------------------------------------------------------- diff --git a/geode-spark-connector/doc/9_java_api.md b/geode-spark-connector/doc/9_java_api.md deleted file mode 100644 index 21d0605..0000000 --- a/geode-spark-connector/doc/9_java_api.md +++ /dev/null @@ -1,129 +0,0 @@ -## Using Connector in Java -This section describes how to access the functionality of Spark Geode -Connector when you write your Spark applications in Java. It is assumed -that you already familiarized yourself with the previous sections and -understand how the Spark Geode Connector works. - -### Prerequisites -The best way to use the Spark Geode Connector Java API is to statically -import all of the methods in `GeodeJavaUtil`. This utility class is -the main entry point for Spark Geode Connector Java API. -``` -import static org.apache.geode.spark.connector.javaapi.GeodeJavaUtil.*; -``` - -Create JavaSparkContext (don't forget about the static import): -``` -SparkConf conf = new SparkConf(); -conf.set(GeodeLocatorPropKey, "192.168.1.47[10334]") -JavaSparkContext jsc = new JavaSparkContext(conf); -``` - -### Accessing Geode region in Java -Geode region is exposed as `GeodeJavaRegionRDD`(subclass of -`JavaPairRDD`): -``` -GeodeJavaRegionRDD rdd1 = javaFunctions(jsc).geodeRegion("emps") -GeodeJavaRegionRDD rdd2 = rdd1.where("value.getAge() < 40"); -``` - -### RDD Join and Outer Join -Use the `rdd3` and region `emps` from [join and outer join examples](5_rdd_join.md): -``` -static class MyKeyFunction implements Function, Integer> { - @Override public Interger call(Tuple2 pair) throws Exception { - return pair._2(); - } -} - -MyKeyFunction func = new MyKeyFunction(); - -JavaPairRDD, Emp> rdd3j = - javaFunction(rdd3).joinGeodeRegion("emps", func); - -JavaPairRDD, Option> rdd3o = - javaFunction(rdd3).outerJoinGeodeRegion("emps", func); - -``` - -### Saving JavaPairRDD to Geode -Saving JavaPairRDD is straightforward: -``` -List> data = new ArrayList<>(); -data.add(new Tuple2<>("7", "seven")); -data.add(new Tuple2<>("8", "eight")); -data.add(new Tuple2<>("9", "nine")); - -// create JavaPairRDD -JavaPairRDD rdd1 = jsc.parallelizePairs(data); -// save to Geode -javaFunctions(rdd1).saveToGeode("str_str_region"); -``` - -In order to save `JavaRDD>`, it needs to be converted to -`JavaPairRDD` via static method `toJavaPairRDD` from `GeodeJavaUtil`: -``` -List> data2 = new ArrayList>(); -data2.add(new Tuple2<>("11", "eleven")); -data2.add(new Tuple2<>("12", "twelve")); -data2.add(new Tuple2<>("13", "thirteen")); - -// create JavaRDD> -JavaRDD> rdd2 = jsc.parallelize(data2); -// save to Geode -javaFunctions(toJavaPairRDD(rdd2)).saveToGeode("str_str_region"); -``` - -### Saving JavaRDD to Geode -Similar to Scala version, a function is required to generate key/value pair -from RDD element. The following `PairFunction` generate a `` -pair from ``: -``` -PairFunction pairFunc = - new PairFunction() { - @Override public Tuple2 call(String s) throws Exception { - return new Tuple2(s, s.length()); - } - }; -``` -Note: there are 3 type parameters for PairFunction, they are: - 1. type of JavaRDD element - 2. type of key of output key/value pair - 3. type of value of output key/value pair - -Once `PairFunction` is ready, the rest is easy: -``` -// create demo JavaRDD -List data = new ArrayList(); -data.add("a"); -data.add("ab"); -data.add("abc"); -JavaRDD jrdd = sc.parallelize(data); - -javaFunctions(rdd).saveToGeode("str_int_region", pairFunc); -``` - -### Saving JavaPairDStream and JavaDStream -Saving JavaPairDStream and JavaDStream is similar to saving JavaPairRDD -jand JavaRDD: -``` -JavaPairDStream ds1 = ... -javaFunctions(ds1).saveToGeode("str_str_region"); - -JavaDStream ds2 = ... -javaFunctions(ds2).saveToGeode("str_int_region", pairFunc); -``` - -### Using Geode OQL - -There are two geodeOQL Java APIs, with and without GeodeConnectionConf. -Here is an example without GeodeConnectionConf, it will use default -GeodeConnectionConf internally. -``` -// assume there's jsc: JavaSparkContext -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(jsc); -DataFrame df = javaFunctions(sqlContext).geodeOQL("select * from /str_str_region"); -df.show(); -``` - -Next: [About The Demos] (10_demos.md) http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java deleted file mode 100644 index d9d49e6..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.geode.spark.connector.internal; - -import org.apache.geode.distributed.internal.ServerLocation; - -import java.util.HashMap; -import java.util.HashSet; -import java.io.Serializable; - -/** - * This class contains all info required by GemFire RDD partitioner to create partitions. - */ -public class RegionMetadata implements Serializable { - - private String regionPath; - private boolean isPartitioned; - private int totalBuckets; - private HashMap> serverBucketMap; - private String keyTypeName; - private String valueTypeName; - - /** - * Default constructor. - * @param regionPath the full path of the given region - * @param isPartitioned true for partitioned region, false otherwise - * @param totalBuckets number of total buckets for partitioned region, ignored otherwise - * @param serverBucketMap geode server (host:port pair) to bucket set map - * @param keyTypeName region key class name - * @param valueTypeName region value class name - */ - public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap> serverBucketMap, - String keyTypeName, String valueTypeName) { - this.regionPath = regionPath; - this.isPartitioned = isPartitioned; - this.totalBuckets = totalBuckets; - this.serverBucketMap = serverBucketMap; - this.keyTypeName = keyTypeName; - this.valueTypeName = valueTypeName; - } - - public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap> serverBucketMap) { - this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null); - } - - public String getRegionPath() { - return regionPath; - } - - public boolean isPartitioned() { - return isPartitioned; - } - - public int getTotalBuckets() { - return totalBuckets; - } - - public HashMap> getServerBucketMap() { - return serverBucketMap; - } - - public String getKeyTypeName() { - return keyTypeName; - } - - public String getValueTypeName() { - return valueTypeName; - } - - public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append("RegionMetadata(region=").append(regionPath) - .append("(").append(keyTypeName).append(", ").append(valueTypeName).append(")") - .append(", partitioned=").append(isPartitioned).append(", #buckets=").append(totalBuckets) - .append(", map=").append(serverBucketMap).append(")"); - return buf.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java deleted file mode 100644 index a6a0910..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.geode.spark.connector.internal.geodefunctions; - -import org.apache.geode.DataSerializer; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.execute.*; -import org.apache.geode.cache.query.SelectResults; -import org.apache.geode.cache.query.Query; -import org.apache.geode.internal.HeapDataOutputStream; -import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; -import org.apache.geode.internal.logging.LogService; -import org.apache.logging.log4j.Logger; -import java.util.Iterator; - -public class QueryFunction implements Function { - - private static final long serialVersionUID = 4866641340803692882L; - - public final static String ID = "geode-spark-query-function"; - - private final static QueryFunction instance = new QueryFunction(); - - private static final Logger logger = LogService.getLogger(); - - private static final int CHUNK_SIZE = 1024; - - @Override - public String getId() { - return ID; - } - - public static QueryFunction getInstance() { - return instance; - } - - @Override - public boolean optimizeForWrite() { - return true; - } - - @Override - public boolean isHA() { - return true; - } - - @Override - public boolean hasResult() { - return true; - } - - @Override - public void execute(FunctionContext context) { - try { - String[] args = (String[]) context.getArguments(); - String queryString = args[0]; - String bucketSet = args[1]; - InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context; - LocalRegion localRegion = (LocalRegion) irfc.getDataSet(); - boolean partitioned = localRegion.getDataPolicy().withPartitioning(); - Query query = CacheFactory.getAnyInstance().getQueryService().newQuery(queryString); - Object result = partitioned ? query.execute((InternalRegionFunctionContext) context) : query.execute(); - ResultSender sender = context.getResultSender(); - HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null); - Iterator iter = ((SelectResults) result).asList().iterator(); - while (iter.hasNext()) { - Object row = iter.next(); - DataSerializer.writeObject(row, buf); - if (buf.size() > CHUNK_SIZE) { - sender.sendResult(buf.toByteArray()); - logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " sendResult(), data size=" + buf.size()); - buf.reset(); - } - } - sender.lastResult(buf.toByteArray()); - logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " lastResult(), data size=" + buf.size()); - buf.reset(); - } - catch(Exception e) { - throw new FunctionException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java deleted file mode 100644 index 096e4d5..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.geode.spark.connector.internal.geodefunctions; - -import java.util.Iterator; -import org.apache.logging.log4j.Logger; - -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.execute.FunctionException; -import org.apache.geode.cache.query.Query; -import org.apache.geode.cache.query.QueryService; -import org.apache.geode.cache.query.SelectResults; -import org.apache.geode.cache.query.Struct; -import org.apache.geode.internal.cache.*; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionContext; -import org.apache.geode.cache.partition.PartitionRegionHelper; -import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; -import org.apache.geode.internal.cache.execute.InternalResultSender; -import org.apache.geode.internal.cache.partitioned.PREntriesIterator; -import org.apache.geode.internal.logging.LogService; - -/** - * GemFire function that is used by `SparkContext.geodeRegion(regionPath, whereClause)` - * to retrieve region data set for the given bucket set as a RDD partition - **/ -public class RetrieveRegionFunction implements Function { - - public final static String ID = "spark-geode-retrieve-region"; - private static final Logger logger = LogService.getLogger(); - private static final RetrieveRegionFunction instance = new RetrieveRegionFunction(); - - public RetrieveRegionFunction() { - } - - /** ------------------------------------------ */ - /** interface Function implementation */ - /** ------------------------------------------ */ - - public static RetrieveRegionFunction getInstance() { - return instance; - } - - @Override - public String getId() { - return ID; - } - - @Override - public boolean hasResult() { - return true; - } - - @Override - public boolean optimizeForWrite() { - return true; - } - - @Override - public boolean isHA() { - return true; - } - - @Override - public void execute(FunctionContext context) { - String[] args = (String[]) context.getArguments(); - String where = args[0]; - String taskDesc = args[1]; - InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context; - LocalRegion localRegion = (LocalRegion) irfc.getDataSet(); - boolean partitioned = localRegion.getDataPolicy().withPartitioning(); - if (where.trim().isEmpty()) - retrieveFullRegion(irfc, partitioned, taskDesc); - else - retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc); - } - - /** ------------------------------------------ */ - /** Retrieve region data with where clause */ - /** ------------------------------------------ */ - - private void retrieveRegionWithWhereClause( - InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) { - String regionPath = localRegion.getFullPath(); - String qstr = "select key, value from " + regionPath + ".entries where " + where; - logger.info(desc + ": " + qstr); - - try { - Cache cache = CacheFactory.getAnyInstance(); - QueryService queryService = cache.getQueryService(); - Query query = queryService.newQuery(qstr); - SelectResults results = - (SelectResults) (partitioned ? query.execute(context) : query.execute()); - - Iterator entries = getStructIteratorWrapper(results.asList().iterator()); - InternalResultSender irs = (InternalResultSender) context.getResultSender(); - StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc); - sender.send(); - } catch (Exception e) { - throw new FunctionException(e); - } - } - - private Iterator getStructIteratorWrapper(Iterator entries) { - return new WrapperIterator>(entries) { - @Override public Object[] next() { - return delegate.next().getFieldValues(); - } - }; - } - - /** ------------------------------------------ */ - /** Retrieve full region data */ - /** ------------------------------------------ */ - - private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) { - Iterator entries; - if (partitioned) { - PREntriesIterator iter = (PREntriesIterator) - ((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator(); - // entries = getPREntryIterator(iter); - entries = getSimpleEntryIterator(iter); - } else { - LocalRegion owner = (LocalRegion) context.getDataSet(); - Iterator iter = (Iterator) owner.entrySet().iterator(); - // entries = getRREntryIterator(iter, owner); - entries = getSimpleEntryIterator(iter); - } - InternalResultSender irs = (InternalResultSender) context.getResultSender(); - StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc); - sender.send(); - } - -// /** An iterator for partitioned region that uses internal API to get serialized value */ -// private Iterator getPREntryIterator(PREntriesIterator iterator) { -// return new WrapperIterator>(iterator) { -// @Override public Object[] next() { -// Region.Entry entry = delegate.next(); -// int bucketId = delegate.getBucketId(); -// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId); -// // owner needs to be the bucket region not the enclosing partition region -// LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId); -// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false); -// return new Object[] {keyInfo.getKey(), value}; -// } -// }; -// } -// -// /** An iterator for replicated region that uses internal API to get serialized value */ -// private Iterator getRREntryIterator(Iterator iterator, LocalRegion region) { -// final LocalRegion owner = region; -// return new WrapperIterator>(iterator) { -// @Override public Object[] next() { -// Region.Entry entry = delegate.next(); -// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null); -// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false); -// return new Object[] {keyInfo.getKey(), value}; -// } -// }; -// } - - // todo. compare performance of regular and simple iterator - /** An general iterator for both partitioned and replicated region that returns un-serialized value */ - private Iterator getSimpleEntryIterator(Iterator iterator) { - return new WrapperIterator>(iterator) { - @Override public Object[] next() { - Region.Entry entry = delegate.next(); - return new Object[] {entry.getKey(), entry.getValue()}; - } - }; - } - - /** ------------------------------------------ */ - /** abstract wrapper iterator */ - /** ------------------------------------------ */ - - /** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */ - abstract class WrapperIterator> implements Iterator { - - final S delegate; - - protected WrapperIterator(S delegate) { - this.delegate = delegate; - } - - @Override public boolean hasNext() { - return delegate.hasNext(); - } - - @Override public void remove() { } - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java deleted file mode 100644 index 646bc3e..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.geode.spark.connector.internal.geodefunctions; - -import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionContext; -import org.apache.geode.cache.execute.ResultSender; -import org.apache.geode.distributed.internal.ServerLocation; -import org.apache.geode.internal.cache.BucketServerLocation66; -import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; -import org.apache.geode.spark.connector.internal.RegionMetadata; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -/** - * This GemFire function retrieve region metadata - */ -public class RetrieveRegionMetadataFunction implements Function { - - public final static String ID = "geode-spark-retrieve-region-metadata"; - - private static final RetrieveRegionMetadataFunction instance = new RetrieveRegionMetadataFunction(); - - public RetrieveRegionMetadataFunction() { - } - - public static RetrieveRegionMetadataFunction getInstance() { - return instance; - } - - @Override - public String getId() { - return ID; - } - - @Override - public boolean optimizeForWrite() { - return false; - } - - @Override - public boolean isHA() { - return true; - } - - @Override - public boolean hasResult() { - return true; - } - - @Override - public void execute(FunctionContext context) { - LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) context).getDataSet(); - String regionPath = region.getFullPath(); - boolean isPartitioned = region.getDataPolicy().withPartitioning(); - String kTypeName = getTypeClassName(region.getAttributes().getKeyConstraint()); - String vTypeName = getTypeClassName(region.getAttributes().getValueConstraint()); - - RegionMetadata metadata; - if (! isPartitioned) { - metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, vTypeName); - } else { - PartitionedRegion pregion = (PartitionedRegion) region; - int totalBuckets = pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets(); - Map> bucketMap = pregion.getRegionAdvisor().getAllClientBucketProfiles(); - HashMap> serverMap = bucketServerMap2ServerBucketSetMap(bucketMap); - metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, kTypeName, vTypeName); - } - - ResultSender sender = context.getResultSender(); - sender.lastResult(metadata); - } - - private String getTypeClassName(Class clazz) { - return clazz == null ? null : clazz.getCanonicalName(); - } - - /** convert bucket to server map to server to bucket set map */ - private HashMap> - bucketServerMap2ServerBucketSetMap(Map> map) { - HashMap> serverBucketMap = new HashMap<>(); - for (Integer id : map.keySet()) { - List locations = map.get(id); - for (BucketServerLocation66 location : locations) { - ServerLocation server = new ServerLocation(location.getHostName(), location.getPort()); - if (location.isPrimary()) { - HashSet set = serverBucketMap.get(server); - if (set == null) { - set = new HashSet<>(); - serverBucketMap.put(server, set); - } - set.add(id); - break; - } - } - } - return serverBucketMap; - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java deleted file mode 100644 index cd086e0..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.geode.spark.connector.internal.geodefunctions; - -import org.apache.geode.DataSerializer; -import org.apache.geode.cache.execute.ResultSender; -import org.apache.geode.cache.query.internal.types.ObjectTypeImpl; -import org.apache.geode.cache.query.internal.types.StructTypeImpl; -import org.apache.geode.cache.query.types.ObjectType; -import org.apache.geode.cache.query.types.StructType; -import org.apache.geode.internal.HeapDataOutputStream; -import org.apache.geode.internal.cache.CachedDeserializable; -import org.apache.geode.internal.logging.LogService; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.Iterator; - -/** - * StructStreamingResultSender and StructStreamingResultCollector are paired - * to transfer result of list of `org.apache.geode.cache.query.Struct` - * from GemFire server to Spark Connector (the client of GemFire server) - * in streaming, i.e., while sender sending the result, the collector can - * start processing the arrived result without waiting for full result to - * become available. - */ -public class StructStreamingResultSender { - - public static final byte TYPE_CHUNK = 0x30; - public static final byte DATA_CHUNK = 0x31; - public static final byte ERROR_CHUNK = 0x32; - public static final byte SER_DATA = 0x41; - public static final byte UNSER_DATA = 0x42; - public static final byte BYTEARR_DATA = 0x43; - - private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class); - public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField}); - - private static final Logger logger = LogService.getLogger(); - private static final int CHUNK_SIZE = 4096; - - // Note: The type of ResultSender returned from GemFire FunctionContext is - // always ResultSender, so can't use ResultSender here - private final ResultSender sender; - private final StructType structType; - private final Iterator rows; - private String desc; - private boolean closed = false; - - /** - * the Constructor - * @param sender the base ResultSender that send data in byte array - * @param type the StructType of result record - * @param rows the iterator of the collection of results - * @param desc description of this result (used for logging) - */ - public StructStreamingResultSender( - ResultSender sender, StructType type, Iterator rows, String desc) { - if (sender == null || rows == null) - throw new NullPointerException("sender=" + sender + ", rows=" + rows); - this.sender = sender; - this.structType = type; - this.rows = rows; - this.desc = desc; - } - - /** the Constructor with default `desc` */ - public StructStreamingResultSender( - ResultSender sender, StructType type, Iterator rows) { - this(sender, type, rows, "StructStreamingResultSender"); - } - - /** - * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR. - * TYPE chunk for sending struct type info, DATA chunk for sending data, and - * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted - * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually - * there are multiple DATA chunks. Each DATA chunk contains multiple rows - * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an - * exception is thrown, it is serialized and sent as the last chunk of the - * result (in the form of ERROR chunk). - */ - public void send() { - if (closed) throw new RuntimeException("sender is closed."); - - HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null); - String dataType = null; - int typeSize = 0; - int rowCount = 0; - int dataSize = 0; - try { - if (rows.hasNext()) { - // Note: only send type info if there's data with it - typeSize = sendType(buf); - buf.writeByte(DATA_CHUNK); - int rowSize = structType == null ? 2 : structType.getFieldNames().length; - while (rows.hasNext()) { - rowCount ++; - Object[] row = rows.next(); - if (rowCount < 2) dataType = entryDataType(row); - if (rowSize != row.length) - throw new IOException(rowToString("Expect " + rowSize + " columns, but got ", row)); - serializeRowToBuffer(row, buf); - if (buf.size() > CHUNK_SIZE) { - dataSize += sendBufferredData(buf, false); - buf.writeByte(DATA_CHUNK); - } - } - } - // send last piece of data or empty byte array - dataSize += sendBufferredData(buf, true); - logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" + - typeSize + ", data.size=" + dataSize + ", row.avg.size=" + - (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount))); - } catch (IOException | RuntimeException e) { - sendException(buf, e); - } finally { - closed = true; - } - } - - private String rowToString(String rowDesc, Object[] row) { - StringBuilder buf = new StringBuilder(); - buf.append(rowDesc).append("("); - for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]); - return buf.append(")") .toString(); - } - - private String entryDataType(Object[] row) { - StringBuilder buf = new StringBuilder(); - buf.append("("); - for (int i = 0; i < row.length; i++) { - if (i != 0) buf.append(", "); - buf.append(row[i].getClass().getCanonicalName()); - } - return buf.append(")").toString(); - } - - private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException { - for (Object data : row) { - if (data instanceof CachedDeserializable) { - buf.writeByte(SER_DATA); - DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf); - } else if (data instanceof byte[]) { - buf.writeByte(BYTEARR_DATA); - DataSerializer.writeByteArray((byte[]) data, buf); - } else { - buf.writeByte(UNSER_DATA); - DataSerializer.writeObject(data, buf); - } - } - } - - /** return the size of type data */ - private int sendType(HeapDataOutputStream buf) throws IOException { - // logger.info(desc + " struct type: " + structType); - if (structType != null) { - buf.writeByte(TYPE_CHUNK); - DataSerializer.writeObject(structType, buf); - return sendBufferredData(buf, false); - } else { - return 0; // default KeyValue type, no type info send - } - } - - private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException { - if (isLast) sender.lastResult(buf.toByteArray()); - else sender.sendResult(buf.toByteArray()); - // logData(buf.toByteArray(), desc); - int s = buf.size(); - buf.reset(); - return s; - } - - /** Send the exception as the last chunk of the result. */ - private void sendException(HeapDataOutputStream buf, Exception e) { - // Note: if exception happens during the serialization, the `buf` may contain - // partial serialized data, which may cause de-serialization hang or error. - // Therefore, always empty the buffer before sending the exception - if (buf.size() > 0) buf.reset(); - - try { - buf.writeByte(ERROR_CHUNK); - DataSerializer.writeObject(e, buf); - } catch (IOException ioe) { - logger.error("StructStreamingResultSender failed to send the result:", e); - logger.error("StructStreamingResultSender failed to serialize the exception:", ioe); - buf.reset(); - } - // Note: send empty chunk as the last result if serialization of exception - // failed, and the error is logged on the GemFire server side. - sender.lastResult(buf.toByteArray()); - // logData(buf.toByteArray(), desc); - } - -// private void logData(byte[] data, String desc) { -// StringBuilder buf = new StringBuilder(); -// buf.append(desc); -// for (byte b : data) { -// buf.append(" ").append(b); -// } -// logger.info(buf.toString()); -// } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Employee.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Employee.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Employee.java deleted file mode 100644 index 180e632..0000000 --- a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Employee.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.org.apache.geode.spark.connector; - -import java.io.Serializable; - -public class Employee implements Serializable { - - private String name; - - private int age; - - public Employee(String n, int a) { - name = n; - age = a; - } - - public String getName() { - return name; - } - - public int getAge() { - return age; - } - - public String toString() { - return new StringBuilder().append("Employee[name=").append(name). - append(", age=").append(age). - append("]").toString(); - } - - public boolean equals(Object o) { - if (o instanceof Employee) { - return ((Employee) o).name.equals(name) && ((Employee) o).age == age; - } - return false; - } - -} -