Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 99BE718962 for ; Mon, 6 Jul 2015 21:27:33 +0000 (UTC) Received: (qmail 35008 invoked by uid 500); 6 Jul 2015 21:27:33 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 34955 invoked by uid 500); 6 Jul 2015 21:27:33 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 34937 invoked by uid 99); 6 Jul 2015 21:27:33 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jul 2015 21:27:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 721F21A67D0 for ; Mon, 6 Jul 2015 21:27:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.791 X-Spam-Level: X-Spam-Status: No, score=0.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id FkFCmicFCnrv for ; Mon, 6 Jul 2015 21:27:16 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id C199D249C8 for ; Mon, 6 Jul 2015 21:27:14 +0000 (UTC) Received: (qmail 34097 invoked by uid 99); 6 Jul 2015 21:27:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jul 2015 21:27:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 312EEE0522; Mon, 6 Jul 2015 21:27:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dschneider@apache.org To: commits@geode.incubator.apache.org Date: Mon, 06 Jul 2015 21:27:17 -0000 Message-Id: <5b7a077c15c34118aad60c4000b5adee@git.apache.org> In-Reply-To: <1367993cf1c24bbea023c2a9c86dc3be@git.apache.org> References: <1367993cf1c24bbea023c2a9c86dc3be@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] incubator-geode git commit: GEODE-9: Imported gemfire-spark-connector from geode-1.0.0-SNAPSHOT-2.src.tar GEODE-9: Imported gemfire-spark-connector from geode-1.0.0-SNAPSHOT-2.src.tar Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/85d44f04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/85d44f04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/85d44f04 Branch: refs/heads/feature/GEODE-9 Commit: 85d44f046bfcf34a835141044f99259e2539ae9d Parents: 11d0367 Author: Darrel Schneider Authored: Mon Jul 6 10:39:00 2015 -0700 Committer: Darrel Schneider Committed: Mon Jul 6 11:32:53 2015 -0700 ---------------------------------------------------------------------- gemfire-spark-connector/.gitignore | 1 + gemfire-spark-connector/README.md | 45 ++ gemfire-spark-connector/doc/10_demos.md | 84 +++ gemfire-spark-connector/doc/1_building.md | 29 + gemfire-spark-connector/doc/2_quick.md | 178 ++++++ gemfire-spark-connector/doc/3_connecting.md | 55 ++ gemfire-spark-connector/doc/4_loading.md | 101 ++++ gemfire-spark-connector/doc/5_rdd_join.md | 237 ++++++++ gemfire-spark-connector/doc/6_save_rdd.md | 61 ++ gemfire-spark-connector/doc/7_save_dstream.md | 68 +++ gemfire-spark-connector/doc/8_oql.md | 60 ++ gemfire-spark-connector/doc/9_java_api.md | 129 ++++ .../connector/internal/RegionMetadata.java | 77 +++ .../gemfirefunctions/QueryFunction.java | 83 +++ .../RetrieveRegionFunction.java | 192 ++++++ .../RetrieveRegionMetadataFunction.java | 102 ++++ .../StructStreamingResultSender.java | 203 +++++++ .../gemfire/spark/connector/Employee.java | 38 ++ .../spark/connector/JavaApiIntegrationTest.java | 368 ++++++++++++ .../gemfire/spark/connector/Portfolio.java | 93 +++ .../gemfire/spark/connector/Position.java | 57 ++ .../src/it/resources/test-regions.xml | 32 + .../src/it/resources/test-retrieve-regions.xml | 40 ++ .../spark/connector/BasicIntegrationTest.scala | 582 +++++++++++++++++++ .../RDDJoinRegionIntegrationTest.scala | 284 +++++++++ .../RetrieveRegionIntegrationTest.scala | 237 ++++++++ .../gemfire/spark/connector/package.scala | 13 + .../connector/testkit/GemFireCluster.scala | 31 + .../spark/connector/testkit/GemFireRunner.scala | 136 +++++ .../spark/connector/testkit/IOUtils.scala | 78 +++ .../spark/streaming/ManualClockHelper.scala | 12 + .../spark/streaming/TestInputDStream.scala | 28 + .../javaapi/GemFireJavaDStreamFunctions.java | 44 ++ .../GemFireJavaPairDStreamFunctions.java | 39 ++ .../javaapi/GemFireJavaPairRDDFunctions.java | 203 +++++++ .../javaapi/GemFireJavaRDDFunctions.java | 136 +++++ .../javaapi/GemFireJavaSQLContextFunctions.java | 33 ++ .../GemFireJavaSparkContextFunctions.java | 71 +++ .../connector/javaapi/GemFireJavaUtil.java | 105 ++++ .../spark/connector/GemFireConnection.scala | 51 ++ .../spark/connector/GemFireConnectionConf.scala | 57 ++ .../connector/GemFireConnectionManager.scala | 15 + .../connector/GemFireFunctionDeployer.scala | 65 +++ .../connector/GemFireKryoRegistrator.scala | 13 + .../connector/GemFirePairRDDFunctions.scala | 117 ++++ .../spark/connector/GemFireRDDFunctions.scala | 93 +++ .../connector/GemFireSQLContextFunctions.scala | 26 + .../GemFireSparkContextFunctions.scala | 23 + .../internal/DefaultGemFireConnection.scala | 126 ++++ .../DefaultGemFireConnectionManager.scala | 51 ++ .../connector/internal/LocatorHelper.scala | 30 + .../StructStreamingResultCollector.scala | 136 +++++ .../connector/internal/oql/QueryParser.scala | 42 ++ .../spark/connector/internal/oql/QueryRDD.scala | 67 +++ .../internal/oql/QueryResultCollector.scala | 53 ++ .../connector/internal/oql/RDDConverter.scala | 24 + .../connector/internal/oql/RowBuilder.scala | 22 + .../connector/internal/oql/SchemaBuilder.scala | 57 ++ .../internal/oql/UndefinedSerializer.scala | 30 + .../connector/internal/rdd/GemFireJoinRDD.scala | 51 ++ .../internal/rdd/GemFireOuterJoinRDD.scala | 53 ++ .../internal/rdd/GemFireRDDPartition.scala | 20 + .../internal/rdd/GemFireRDDPartitioner.scala | 43 ++ .../rdd/GemFireRDDPartitionerImpl.scala | 73 +++ .../internal/rdd/GemFireRDDWriter.scala | 55 ++ .../internal/rdd/GemFireRegionRDD.scala | 122 ++++ .../javaapi/GemFireJavaRegionRDD.scala | 10 + .../spark/connector/javaapi/JavaAPIHelper.scala | 35 ++ .../gemfire/spark/connector/package.scala | 43 ++ .../streaming/GemFireDStreamFunctions.scala | 61 ++ .../spark/connector/streaming/package.scala | 16 + .../gemfire/spark/connector/JavaAPITest.java | 147 +++++ .../connector/GemFireFunctionDeployerTest.scala | 42 ++ .../DefaultGemFireConnectionManagerTest.scala | 66 +++ ...tStreamingResultSenderAndCollectorTest.scala | 238 ++++++++ .../internal/oql/QueryParserTest.scala | 67 +++ .../connector/ConnectorImplicitsTest.scala | 34 ++ .../connector/GemFireConnectionConfTest.scala | 84 +++ .../connector/GemFireDStreamFunctionsTest.scala | 63 ++ .../connector/GemFireRDDFunctionsTest.scala | 99 ++++ .../spark/connector/LocatorHelperTest.scala | 75 +++ .../rdd/GemFireRDDPartitionerTest.scala | 174 ++++++ .../connector/rdd/GemFireRegionRDDTest.scala | 101 ++++ .../basic-demos/src/main/java/demo/Emp.java | 79 +++ .../src/main/java/demo/OQLJavaDemo.java | 43 ++ .../src/main/java/demo/PairRDDSaveJavaDemo.java | 70 +++ .../src/main/java/demo/RDDSaveJavaDemo.java | 69 +++ .../src/main/java/demo/RegionToRDDJavaDemo.java | 41 ++ .../src/main/scala/demo/NetworkWordCount.scala | 59 ++ .../project/Dependencies.scala | 29 + .../project/GemFireSparkBuild.scala | 60 ++ gemfire-spark-connector/project/Settings.scala | 42 ++ .../project/build.properties | 1 + gemfire-spark-connector/project/plugins.sbt | 8 + gemfire-spark-connector/scalastyle-config.xml | 117 ++++ 95 files changed, 7853 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/.gitignore ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/.gitignore b/gemfire-spark-connector/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/gemfire-spark-connector/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/README.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/README.md b/gemfire-spark-connector/README.md new file mode 100644 index 0000000..125cef7 --- /dev/null +++ b/gemfire-spark-connector/README.md @@ -0,0 +1,45 @@ +#Spark GemFire Connector + +Note: GemFire is now an open source project [Geode](http://projectgeode.org). + +Spark GemFire Connector let's you connect Spark to GemFire, expose GemFire regions as Spark +RDDs, save Spark RDDs to GemFire and execute GemFire OQL queries in your Spark applications +and expose results as DataFrames. + +##Features: + - Expose GemFire region as Spark RDD with GemFire server-side filtering + - RDD join and outer join GemFire region + - Save Spark RDD to GemFire + - Save DStream to GemFire + - Execute GemFire OQL and return DataFrame + +##Version and Compatibility +| Connector | Spark | GemFire | Geode | +|-----------|-------|---------|-------| +| 0.5 | 1.3 | 9.0 | ? | + +##Download +TBD + +##Documentation + - [Building and testing](doc/1_building.md) + - [Quick start](doc/2_quick.md) + - [Connect to GemFire](doc/3_connecting.md) + - [Loading data from GemFire](doc/4_loading.md) + - [RDD Join and Outer Join GemFire Region](doc/5_rdd_join.md) + - [Saving RDD to GemFire](doc/6_save_rdd.md) + - [Saving DStream to GemFire](doc/7_save_dstream.md) + - [GemFire OQL](doc/8_oql.md) + - [Using Connector in Java](doc/9_java_api.md) + - [About the demos](doc/10_demos.md) + - [Logging](doc/logging.md) ??? + - [Security] (doc/security.md) ??? + + +##Community: Reporting bugs, mailing list, contributing + + (TBD) + +##License: Apache License 2.0 + + (TBD) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/10_demos.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/10_demos.md b/gemfire-spark-connector/doc/10_demos.md new file mode 100644 index 0000000..ef4ef86 --- /dev/null +++ b/gemfire-spark-connector/doc/10_demos.md @@ -0,0 +1,84 @@ +## About The Demos +The Spark GemFire Connector contains basic demos, as samples, in both Scala +and Java. + + - Read GemFire region to Spark as a RDD (`RegionToRDDJavaDemo.java`) + - Write Spark pair RDD to GemFire (`PairRDDSaveJavaDemo.java`) + - Write Spark non-pair RDD to GemFire (`RDDSaveJavaDemo.java`) + - Read OQL query result as Spark DataFrame (OQLJavaDemo.java) + - Network stateful word count (NetworkWordCount.scala) + +### Requirements +Running the demo requires a GemFire Cluster. This can be a one +node or multi-node cluster. + +Here are the commands that start a two-node GemFire cluster on localhost: +First set up environment variables: +``` +export JAVA_HOME= +export GEMFIRE= +export CONNECTOR= +export CLASSPATH=$CLASSPATH:$GEMFIRE/lib/locator-dependencies.jar:$GEMFIRE/lib/server-dependencies.jar:$GEMFIRE/lib/gfsh-dependencies.jar +export PATH=$PATH:$GEMFIRE/bin +export GF_JAVA=$JAVA_HOME/bin/java + +Now run gfsh and execute the commands: +$ cd +$ mkdir locator server1 server2 +$ gfsh +gfsh> start locator --name=locator +gfsh> start server --name=server1 --server-port=40411 +gfsh> start server --name=server2 --server-port=40412 +``` + +In order to run the Demos, you need to create the following regions +via `gfsh`: +``` +gfsh> create region --name=str_str_region --type=REPLICATE --key-constraint=java.lang.String --value-constraint=java.lang.String +gfsh> create region --name=str_int_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.Integer +``` + +And deploy GemFire functions required by the Spark GemFire Connector: +``` +gfsh> deploy --jar=/gemfire-functions/target/scala-2.10/gemfire-functions_2.10-0.5.0.jar +``` + +### Run simple demos +This section describes how to run `RDDSaveJavaDemo.java`, +`PairRDDSaveJavaDemo.java` and `RegionToRDDJavaDemo.java`: +``` +export SPARK_CLASSPATH=$CONNECTOR/gemfire-spark-connector/target/scala-2.10/gemfire-spark-connector_2.10-0.5.0.jar:$GEMFIRE/lib/server-dependencies.jar + +cd +bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port] + +bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port] + +bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar locatorHost[port] +``` + +### Run stateful network word count +This demo shows how to save DStream to GemFire. To run the demo, open 3 Terminals: + +**Terminal-1**, start net cat server: +``` +$ nc -lk 9999 +``` + +**Terminal-2**, start word count Spark app: +``` +bin/spark-submit --master=local[2] demo.NetworkWordCount $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port` +``` + +Switch to Terminal-1, type some words, and hit `enter` or `return` key, then check word count at **Terminal-3**, which has `gfsh` connected to the GemFire cluster: +``` +gfsh> query --query="select key, value from /str_int_region.entrySet" +``` + +### Shutdown GemFire cluster at the end +Use following command to shutdown the GemFire cluster after playing with +the demos: +``` +gfsh> shutdown --include-locators=true +``` + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/1_building.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/1_building.md b/gemfire-spark-connector/doc/1_building.md new file mode 100644 index 0000000..ece4a9c --- /dev/null +++ b/gemfire-spark-connector/doc/1_building.md @@ -0,0 +1,29 @@ +## Building and Testing + +You will need Scala 2.10 and sbt 0.13.5 to 0.13.7. + +### Building Artifacts + +In the root directory of connector project, run: +``` +sbt clean package +``` + +The following jar files will be created: + - `gemfire-spark-connector/target/scala-2.10/gemfire-spark-connector_2.10-0.5.0.jar` + - `gemfire-functions/target/scala-2.10/gemfire-functions_2.10-0.5.0.jar` + - `gemfire-spark-demos/target/scala-2.10/gemfire-spark-demos_2.10-0.5.0.jar ` + +### Testing +Commands to run unit and integration tests: +``` +sbt test // unit tests +sbt it:test // integration tests +``` + +Integration tests start up a GemFire cluster and starts up Spark in local mode. +Please make sure you've done following before you run `sbt it:test`: + - run`sbt package` + - set environment variable `GEMFIRE` to point to a GemFire installation. + +Next: [Quick Start](2_quick.md) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/2_quick.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/2_quick.md b/gemfire-spark-connector/doc/2_quick.md new file mode 100644 index 0000000..b05302b --- /dev/null +++ b/gemfire-spark-connector/doc/2_quick.md @@ -0,0 +1,178 @@ +## 5 Minutes Quick Start Guide + +In this quick start guide, you will learn how to use Spark shell to test Spark +GemFire Connector functionalities. + +### Prerequisites + +Before you start, you should have basic knowledge of GemFire and Apache Spark. +Please refer to [GemFire Documentation](http://gemfire.docs.pivotal.io/latest/userguide/index.html) +and [Spark Documentation](https://spark.apache.org/docs/latest/index.html) for +the details. If you are new to GemFire, this +[tutorial](http://gemfire.docs.pivotal.io/latest/userguide/index.html#getting_started/gemfire_tutorial/chapter_overview.html) +is a good starting point. + +You need 2 terminals to follow along, one for GemFire `gfsh`, and one for Spark shell. Set up Jdk 1.7 on both of them. + +### GemFire `gfsh` terminal +In this terminal, start GemFire cluster, deploy Connector's gemfire-function jar, and create demo regions. + +Set up environment variables: +``` +export JAVA_HOME= +export GEMFIRE= +export CONNECTOR= +export CLASSPATH=$CLASSPATH:$GEMFIRE/lib/locator-dependencies.jar:$GEMFIRE/lib/server-dependencies.jar:$GEMFIRE/lib/gfsh-dependencies.jar +export PATH=$PATH:$GEMFIRE/bin +export GF_JAVA=$JAVA_HOME/bin/java +``` + +Start GemFire cluster with 1 locator and 2 servers: +``` +gfsh +gfsh>start locator --name=locator1 --port=55221 +gfsh>start server --name=server1 --locators=localhost[55221] --server-port=0 +gfsh>start server --name=server2 --locators=localhost[55221] --server-port=0 +``` + +Then create two demo regions: +``` +gfsh>create region --name=str_str_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.String +gfsh>create region --name=int_str_region --type=PARTITION --key-constraint=java.lang.Integer --value-constraint=java.lang.String +``` + +Deploy Connector's gemfire-function jar (`gemfire-functions_2.10-0.5.0.jar`): +``` +gfsh>deploy --jar=/gemfire-functions/target/scala-2.10/gemfire-functions_2.10-0.5.0.jar +``` + +### Spark shell terminal +In this terminal, setup Spark environment, and start Spark shell. + +Set GemFire locator property in Spark configuration: add +following to `/conf/spark-defaults.conf`: +``` +spark.gemfire.locators=localhost[55221] +``` +Note: + - if the file doesn't exist, create one. + - replace string `localhost[55221]` with your own locator host and port. + +By default, Spark shell output lots of info log, if you want to +turn off info log, change `log4j.rootCategory` to `WARN, console` +in file `/conf/conf/log4j.properties`: +``` +log4j.rootCategory=WARN, console +``` +if file `log4j.properties` doesn't exist, copy `log4j.properties.template` +under the same directory to `log4j.properties` and update the file. + +Start spark-shell: +``` +bin/spark-shell --master local[*] --jars $CONNECTOR/gemfire-spark-connector/target/scala-2.10/gemfire-spark-connector_2.10-0.5.0.jar,$GEMFIRE/lib/server-dependencies.jar +``` + +Check GemFire locator property in the Spark shell: +``` +scala> sc.getConf.get("spark.gemfire.locators") +res0: String = localhost[55221] +``` + +In order to enable GemFire specific functions, you need to import +`io.pivotal.gemfire.spark.connector._` +``` +scala> import io.pivotal.gemfire.spark.connector._ +import io.pivotal.gemfire.spark.connector._ +``` + +### Save Pair RDD to GemFire +In the Spark shell, create a simple pair RDD and save it to GemFire: +``` +scala> val data = Array(("1", "one"), ("2", "two"), ("3", "three")) +data: Array[(String, String)] = Array((1,one), (2,two), (3,three)) + +scala> val distData = sc.parallelize(data) +distData: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at :14 + +scala> distData.saveToGemfire("str_str_region") +15/02/17 07:11:54 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:29, took 0.341288 s +``` + +Verify the data is saved in GemFile using `gfsh`: +``` +gfsh>query --query="select key,value from /str_str_region.entries" + +Result : true +startCount : 0 +endCount : 20 +Rows : 3 + +key | value +--- | ----- +1 | one +3 | three +2 | two + +NEXT_STEP_NAME : END +``` + +### Save Non-Pair RDD to GemFire +Saving non-pair RDD to GemFire requires an extra function that converts each +element of RDD to a key-value pair. Here's sample session in Spark shell: +``` +scala> val data2 = Array("a","ab","abc") +data2: Array[String] = Array(a, ab, abc) + +scala> val distData2 = sc.parallelize(data2) +distData2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at :17 + +scala> distData2.saveToGemfire("int_str_region", e => (e.length, e)) +[info 2015/02/17 12:43:21.174 PST
tid=0x1] +... +15/02/17 12:43:21 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:52, took 0.251194 s +``` + +Verify the result with `gfsh`: +``` +gfsh>query --query="select key,value from /int_str_region.entrySet" + +Result : true +startCount : 0 +endCount : 20 +Rows : 3 + +key | value +--- | ----- +2 | ab +3 | abc +1 | a + +NEXT_STEP_NAME : END +``` + +### Expose GemFire Region As RDD +The same API is used to expose both replicated and partitioned region as RDDs. +``` +scala> val rdd = sc.gemfireRegion[String, String]("str_str_region") +rdd: io.pivotal.gemfire.spark.connector.rdd.GemFireRDD[String,String] = GemFireRDD[2] at RDD at GemFireRDD.scala:19 + +scala> rdd.foreach(println) +(1,one) +(3,three) +(2,two) + +scala> val rdd2 = sc.gemfireRegion[Int, String]("int_str_region") +rdd2: io.pivotal.gemfire.spark.connector.rdd.GemFireRDD[Int,String] = GemFireRDD[3] at RDD at GemFireRDD.scala:19 + +scala> rdd2.foreach(println) +(2,ab) +(1,a) +(3,abc) +``` +Note: use the right type of region key and value, otherwise you'll get +ClassCastException. + + +Next: [Connecting to GemFire](3_connecting.md) + + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/3_connecting.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/3_connecting.md b/gemfire-spark-connector/doc/3_connecting.md new file mode 100644 index 0000000..bac9785 --- /dev/null +++ b/gemfire-spark-connector/doc/3_connecting.md @@ -0,0 +1,55 @@ +## Connecting to GemFire + +There are two ways to connect Spark to Gemfire: + - Specify GemFire connection properties via `SparkConf`. + - Specify GemFire connection properties via `GemFireConnectionConf`. + +### Specify GemFire connection properties via `SparkConf` +The only required GemFire connection property is `spark.gemfire.locators`. +This can be specified in `/conf/spark-defaults.conf` or in Spark +application code. In the following examples, we assume you want to provide +3 extra properties: `security-client-auth-init`, `security-username`, and +`security-password`, note that they are prefixed with `spark.gemfire.`. + +In `/conf/spark-defaults.com` +``` +spark.gemfire.locators=192.168.1.47[10334] +spark.gemfire.security-client-auth-init=templates.security.UserPasswordAuthInit.create +spark.gemfire.security-username=scott +spark.gemfire.security-password=tiger +``` + +Or in the Spark application code: +``` +import io.pivotal.gemfire.spark.connector._ +val sparkConf = new SparkConf() + .set(GemFireLocatorPropKey, "192.168.1.47[10334]") + .set("spark.gemfire.security-client-auth-init", "templates.security.UserPasswordAuthInit.create") + .set("spark.gemfire.security-username", "scott") + .set("spark.gemfire.security-password", "tiger") +``` + +After this, you can use all connector APIs without providing `GemfireConnectionConf`. + +### Specify GemFire connection properties via `GemFireConnectionConf` +Here's the code that creates `GemFireConnectionConf` with the same set of +properties as the examples above: +``` +val props = Map("security-client-auth-init" -> "templates.security.UserPasswordAuthInit.create", + "security-username" -> "scott", + "security-password" -> "tiger") +val connConf = GemFireConnectionConf("192.168.1.47[10334]", props) +``` + +Please note that those properties are **not** prefixed with `spark.gemfire.`. + +After this, you can use all connector APIs that require `GemFireConnectionConf`. + +### Notes about locators + - You can specify locator in two formats: `host[port]` or `host:port`. For + example `192.168.1.47[10334]` or `192.168.1.47:10334` + - If your GemFire cluster has multiple locators, list them all and separated + by `,`. For example: `host1:10334,host2:10334`. + + +Next: [Loading Data from GemFire](4_loading.md) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/4_loading.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/4_loading.md b/gemfire-spark-connector/doc/4_loading.md new file mode 100644 index 0000000..fb03660 --- /dev/null +++ b/gemfire-spark-connector/doc/4_loading.md @@ -0,0 +1,101 @@ +## Loading Data from GemFire + +To expose full data set of a GemFire region as a Spark +RDD, call `gemfireRegion` method on the SparkContext object. + +``` +val rdd = sc.gemfireRegion("region path") +``` + +Or with specific `GemfireConectionConf` object instance (see +[Connecting to Gemfire](3_connecting.md) for how to create GemfireConectionConf): +``` +val rdd = sc.gemfireRegion("region path", connConf) +``` + +## GemFire RDD Partitions + +GemFire has two region types: **replicated**, and +**partitioned** region. Replicated region has full dataset on +each server, while partitioned region has its dataset spanning +upon multiple servers, and may have duplicates for high +availability. + +Since replicated region has its full dataset available on every +server, there is only one RDD partition for a `GemFireRegionRDD` that +represents a replicated region. + +For a `GemFireRegionRDD` that represents a partitioned region, there are +many potential ways to create RDD partitions. So far, we have +implemented ServerSplitsPartitioner, which will split the bucket set +on each GemFire server into two RDD partitions by default. +The number of splits is configurable, the following shows how to set +three partitions per GemFire server: +``` +import io.pivotal.gemfire.spark.connector._ + +val opConf = Map(PreferredPartitionerPropKey -> ServerSplitsPartitionerName, + NumberPartitionsPerServerPropKey -> "3") + +val rdd1 = sc.gemfireRegion[String, Int]("str_int_region", opConf = opConf) +// or +val rdd2 = sc.gemfireRegion[String, Int]("str_int_region", connConf, opConf) +``` + + +## GemFire Server-Side Filtering +Server-side filtering allow exposing partial dataset of a GemFire region +as a RDD, this reduces the amount of data transferred from GemFire to +Spark to speed up processing. +``` +val rdd = sc.gemfireRegion("").where("") +``` + +The above call is translated to OQL query `select key, value from /.entries where `, then the query is executed for each RDD partition. Note: the RDD partitions are created the same way as described in the section above. + +In the following demo, javabean class `Emp` is used, it has 5 attributes: `id`, `lname`, `fname`, `age`, and `loc`. In order to make `Emp` class available on GemFire servers, we need to deploy a jar file that contains `Emp` class, now build the `emp.jar`, deploy it and create region `emps` in `gfsh`: +``` +zip $CONNECTOR/gemfire-spark-demos/basic-demos/target/scala-2.10/basic-demos_2.10-0.5.0.jar \ + -i "demo/Emp.class" --out $CONNECTOR/emp.jar + +gfsh +gfsh> deploy --jar=/emp.jar +gfsh> create region --name=emps --type=PARTITION +``` +Note: The `Emp.class` is availble in `basic-demos_2.10-0.5.0.jar`. But that jar file depends on many scala and spark classes that are not available on GemFire servers' classpath. So use the above `zip` command to create a jar file that only contains `Emp.class`. + +Now in Spark shell, generate some random `Emp` records, and save them to region `emps` (remember to add `emp.jar` to Spark shell classpath before starting Spark shell): +``` +import io.pivotal.gemfire.spark.connector._ +import scala.util.Random +import demo.Emp + +val lnames = List("Smith", "Johnson", "Jones", "Miller", "Wilson", "Taylor", "Thomas", "Lee", "Green", "Parker", "Powell") +val fnames = List("John", "James", "Robert", "Paul", "George", "Kevin", "Jason", "Jerry", "Peter", "Joe", "Alice", "Sophia", "Emma", "Emily") +val locs = List("CA", "WA", "OR", "NY", "FL") +def rpick(xs: List[String]): String = xs(Random.nextInt(xs.size)) + +val d1 = (1 to 20).map(x => new Emp(x, rpick(lnames), rpick(fnames), 20+Random.nextInt(41), rpick(locs))).toArray +val rdd1 = sc.parallelize(d1) +rdd1.saveToGemfire("emps", e => (e.getId, e)) +``` + +Now create a RDD that contains all employees whose age is less than 40, and display its contents: +``` +val rdd1s = sc.gemfireRegion("emps").where("value.getAge() < 40") + +rdd1s.foreach(println) +(5,Emp(5, Taylor, Robert, 32, FL)) +(14,Emp(14, Smith, Jason, 28, FL)) +(7,Emp(7, Jones, Robert, 26, WA)) +(17,Emp(17, Parker, John, 20, WA)) +(2,Emp(2, Thomas, Emily, 22, WA)) +(10,Emp(10, Lee, Alice, 31, OR)) +(4,Emp(4, Wilson, James, 37, CA)) +(15,Emp(15, Powell, Jason, 34, NY)) +(3,Emp(3, Lee, Sophia, 32, OR)) +(9,Emp(9, Johnson, Sophia, 25, OR)) +(6,Emp(6, Miller, Jerry, 30, NY)) +``` + +Next: [RDD Join and Outer Join GemFire Region](5_rdd_join.md) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/5_rdd_join.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/5_rdd_join.md b/gemfire-spark-connector/doc/5_rdd_join.md new file mode 100644 index 0000000..edc86e8 --- /dev/null +++ b/gemfire-spark-connector/doc/5_rdd_join.md @@ -0,0 +1,237 @@ +## RDD Join and Outer Join GemFire Region + +The Spark GemFire Connector suports using any RDD as a source +of a join and outer join with a GemFire region through APIs +`joinGemfireRegion[K, V]` and `outerJoinGemfireRegion[K, V]`. +Those two APIs execute a single `region.getAll` call for every +partition of the source RDD, so no unnecessary data will be requested +or transferred. This means a join or outer join between any RDD and +a GemFire region can be performed without full region scan, and the +source RDD's partitioning and placement for data locality are used. + +Please note that the two type parameters `[K, V]` are the type +of key/value pair of region entries, they need to be specified +to make result RDD has correct type. + +The region `emps` that is created and populated in +[GemFire Server-Side Filtering](4_loading.md) will be used in the +following examples. + +### RDD[(K, V1)] join and outer join Region[K, V2] + +In this case, the source RDD is a pair RDD, and it has the same key +type as the Region. Use API `rdd.joinGemfireRegion[K, V2](regionPath)` and +`rdd.outerJoinGemfireRegion[K, V2](regionPath)` do the join and outer +join. + +Prepare a source RDD `rdd2`: +``` +val d2 = (11 to 25).map(x => (x, s"message-$x")).toArray +val rdd2 = sc.parallelize(d2) +// print rdd2's content +rdd2.foreach(println) +(11,message-11) +(12,message-12) +(13,message-13) +(14,message-14) +(15,message-15) +(16,message-16) +(17,message-17) +(18,message-18) +(19,message-19) +(20,message-20) +(21,message-21) +(22,message-22) +(23,message-23) +(24,message-24) +(25,message-25) +``` + +Join RDD `rdd2` with region `emps`, and print out the result: +``` +val rdd2j = rdd2.joinGemfireRegion[Int, Emp]("emps") + +rdd2j.foreach(println) +((11,message-11),Emp(11, Taylor, Emma, 44, CA)) +((12,message-12),Emp(12, Taylor, Joe, 60, FL)) +((13,message-13),Emp(13, Lee, Kevin, 50, FL)) +((14,message-14),Emp(14, Smith, Jason, 28, FL)) +((15,message-15),Emp(15, Powell, Jason, 34, NY)) +((16,message-16),Emp(16, Thomas, Alice, 42, OR)) +((17,message-17),Emp(17, Parker, John, 20, WA)) +((18,message-18),Emp(18, Powell, Alice, 58, FL)) +((19,message-19),Emp(19, Taylor, Peter, 46, FL)) +((20,message-20),Emp(20, Green, Peter, 57, CA)) +``` +Note that there's no pairs in the result RDD `rdd2j` corresponding to +the pairs with id from 21 to 25 in RDD `rdd2` since there's no region +entries have those key values. + +Outer join RDD `rdd2` with region `emps`, and print out the result: +``` +val rdd2o = rdd2.outerJoinGemfireRegion[Int, Emp]("emps") + +rdd2o.foreach(println) +((18,message-18),Some(Emp(18, Powell, Alice, 58, FL))) +((19,message-19),Some(Emp(19, Taylor, Peter, 46, FL))) +((11,message-11),Some(Emp(11, Taylor, Emma, 44, CA))) +((12,message-12),Some(Emp(12, Taylor, Joe, 60, FL))) +((20,message-20),Some(Emp(20, Green, Peter, 57, CA))) +((21,message-21),None) +((22,message-22),None) +((23,message-23),None) +((24,message-24),None) +((25,message-25),None) +((13,message-13),Some(Emp(13, Lee, Kevin, 50, FL))) +((14,message-14),Some(Emp(14, Smith, Jason, 28, FL))) +((15,message-15),Some(Emp(15, Powell, Jason, 34, NY))) +((16,message-16),Some(Emp(16, Thomas, Alice, 42, OR))) +((17,message-17),Some(Emp(17, Parker, John, 20, WA))) +``` +Note that there are pairs in the result RDD `rdd2o` corresponding to +the pairs with id from 21 to 25 in the RDD `rdd2`, and values are `None` +since there's no region entries have those key values. + +### RDD[(K1, V1)] join and outer join Region[K2, V2] + +In this case, the source RDD is still a pair RDD, but it has different +key type. Use API `rdd.joinGemfireRegion[K2, V2](regionPath, func)` and +`rdd.outerJoinGemfireRegion[K2, V2](regionPath, func)` do the join and +outer join, where `func` is the function to generate key from (k, v) +pair, the element of source RDD, to join with GemFire region. + +Prepare a source RDD `d3`: +``` +val d3 = (11 to 25).map(x => (s"message-$x", x)).toArray +val rdd3 = sc.parallelize(d3) +// print rdd3's content +rdd3.foreach(println) +(message-18,18) +(message-19,19) +(message-11,11) +(message-20,20) +(message-21,21) +(message-22,22) +(message-12,12) +(message-23,23) +(message-24,24) +(message-25,25) +(message-13,13) +(message-14,14) +(message-15,15) +(message-16,16) +(message-17,17) +``` + +Join RDD `rdd3` (RDD[(String, Int)] with region `emps` (Region[Int, Emp]), and print out the result: +``` +val rdd3j = rdd3.joinGemfireRegion[Int, Emp]("emps", pair => pair._2) + +rdd3j.foreach(println) +((message-18,18),Emp(18, Powell, Alice, 58, FL)) +((message-19,19),Emp(19, Taylor, Peter, 46, FL)) +((message-20,20),Emp(20, Green, Peter, 57, CA)) +((message-11,11),Emp(11, Taylor, Emma, 44, CA)) +((message-12,12),Emp(12, Taylor, Joe, 60, FL)) +((message-13,13),Emp(13, Lee, Kevin, 50, FL)) +((message-14,14),Emp(14, Smith, Jason, 28, FL)) +((message-15,15),Emp(15, Powell, Jason, 34, NY)) +((message-16,16),Emp(16, Thomas, Alice, 42, OR)) +((message-17,17),Emp(17, Parker, John, 20, WA)) +``` +Note `pair => pair._2` means use the 2nd element of the element of source +RDD and join key. + +Outer join RDD `rdd3` with region `emps`, and print out the result: +``` +val rdd3o = rdd3.outerJoinGemfireRegion[Int, Emp]("emps", pair => pair._2) + +rdd3o.foreach(println) +((message-18,18),Some(Emp(18, Powell, Alice, 58, FL))) +((message-11,11),Some(Emp(11, Taylor, Emma, 44, CA))) +((message-19,19),Some(Emp(19, Taylor, Peter, 46, FL))) +((message-12,12),Some(Emp(12, Taylor, Joe, 60, FL))) +((message-20,20),Some(Emp(20, Green, Peter, 57, CA))) +((message-13,13),Some(Emp(13, Lee, Kevin, 50, FL))) +((message-21,21),None) +((message-14,14),Some(Emp(14, Smith, Jason, 28, FL))) +((message-22,22),None) +((message-23,23),None) +((message-24,24),None) +((message-25,25),None) +((message-15,15),Some(Emp(15, Powell, Jason, 34, NY))) +((message-16,16),Some(Emp(16, Thomas, Alice, 42, OR))) +((message-17,17),Some(Emp(17, Parker, John, 20, WA))) +``` + +### RDD[T] join and outer join Region[K, V] + +Use API `rdd.joinGemfireRegion[K, V](regionPath, func)` and +`rdd.outerJoinGemfireRegion[K, V](regionPath, func)` do the join +and outer join, where `func` is the function to generate key from +`t`, the element of source RDD, to join with GemFire region. + +Prepare a source RDD `d4`: +``` +val d4 = (11 to 25).map(x => x * 2).toArray +val rdd4 = sc.parallelize(d4) +// print rdd4's content +rdd4.foreach(println) +22 +24 +36 +38 +40 +42 +44 +46 +26 +28 +48 +30 +32 +34 +50 +``` + +Join RDD `d4` with region `emps`, and print out the result: +``` +val rdd4j = rdd4.joinGemfireRegion[Int, Emp]("emps", x => x/2) + +rdd4j.foreach(println) +(22,Emp(11, Taylor, Emma, 44, CA)) +(24,Emp(12, Taylor, Joe, 60, FL)) +(26,Emp(13, Lee, Kevin, 50, FL)) +(28,Emp(14, Smith, Jason, 28, FL)) +(30,Emp(15, Powell, Jason, 34, NY)) +(32,Emp(16, Thomas, Alice, 42, OR)) +(34,Emp(17, Parker, John, 20, WA)) +(36,Emp(18, Powell, Alice, 58, FL)) +(38,Emp(19, Taylor, Peter, 46, FL)) +(40,Emp(20, Green, Peter, 57, CA)) +``` + +Outer join RDD `d4` with region `emps`, and print out the result: +``` +val rdd4o = rdd4.outerJoinGemfireRegion[Int, Emp]("emps", x => x/2) + +rdd4o.foreach(println) +(36,Some(Emp(18, Powell, Alice, 58, FL))) +(38,Some(Emp(19, Taylor, Peter, 46, FL))) +(40,Some(Emp(20, Green, Peter, 57, CA))) +(42,None) +(44,None) +(46,None) +(48,None) +(50,None) +(22,Some(Emp(11, Taylor, Emma, 44, CA))) +(24,Some(Emp(12, Taylor, Joe, 60, FL))) +(26,Some(Emp(13, Lee, Kevin, 50, FL))) +(28,Some(Emp(14, Smith, Jason, 28, FL))) +(30,Some(Emp(15, Powell, Jason, 34, NY))) +(32,Some(Emp(16, Thomas, Alice, 42, OR))) +(34,Some(Emp(17, Parker, John, 20, WA))) +``` + + +Next: [Saving RDD to GemFire](6_save_join.md) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/6_save_rdd.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/6_save_rdd.md b/gemfire-spark-connector/doc/6_save_rdd.md new file mode 100644 index 0000000..8516955 --- /dev/null +++ b/gemfire-spark-connector/doc/6_save_rdd.md @@ -0,0 +1,61 @@ +## Saving RDD to GemFire + +It is possible to save any RDD to a GemFire region. The requirements are: + - the object class of the elements contained by the RDD is + (1) available on the classpath of GemFire servers + (2) and serializable. + - the target region exists. + +To save an RDD to an existing GemFire region, import +`io.pivotal.gemfire.spark.connector._` and call the `saveToGemfire` +method on RDD. + +### Save RDD[(K, V)] to GemFire +For pair RDD, i.e., RDD[(K, V)], the pair is treated as key/value pair. +``` +val data = Array(("1","one"),("2","two"),("3","three")) +val rdd = sc.parallelize(data) +rdd.saveToGemfire("str_str_region") +``` + +If you create GemFireConnectionConf as described in +[Connecting to Gemfire](3_connecting.md), the last statement becomes: +``` +rdd.saveToGemFire("str_str_region", connConf) +``` + +You can verify the region contents: +``` +gfsh>query --query="select key, value from /str_str_region.entrySet" + +Result : true +startCount : 0 +endCount : 20 +Rows : 3 + +key | value +--- | ----- +1 | one +3 | three +2 | two +``` + +Note that GemFire regions require unique keys, so if the pair RDD +contains duplicated keys, those pairs with the same key are overwriting +each other, and only one of them appears in the final dataset. + +### Save RDD[T] to GemFire +To save non-pair RDD to GemFire, a function (`f: T => K`) that creates keys +from elements of RDD, and is used to convert RDD element `T` to pair `(f(T), T)`, +then the pair is save to GemFire. + +``` +val data2 = Array("a","ab","abc") +val rdd2 = sc.parallelize(data2) +rdd2.saveToGemfire("str_int_region", e => (e, e.length)) +// or use GemFireConnectionConf object directly +// rdd2.saveToGemfire("rgnb", e => (e, e.length), connConf) +``` + + +Next: [Saving DStream to GemFire](7_save_dstream.md) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/7_save_dstream.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/7_save_dstream.md b/gemfire-spark-connector/doc/7_save_dstream.md new file mode 100644 index 0000000..17903fa --- /dev/null +++ b/gemfire-spark-connector/doc/7_save_dstream.md @@ -0,0 +1,68 @@ +## Saving DStream to GemFire +Spark Streaming extends the core API to allow high-throughput, fault-tolerant +stream processing of live data streams. Data can be ingested from many +sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. +Results can be stored in GemFire. + +### A Simple Spark Streaming App: Stateful Network Word Count + +Create a `StreamingContext` with a `SparkConf` configuration +``` +val ssc = new StreamingContext(sparkConf, Seconds(1)) +``` + +Create a DStream that will connect to net cat server `host:port` +``` +val lines = ssc.socketTextStream(host, port) +``` + +Count each word in each batch +``` +val words = lines.flatMap(_.split(" ")) +val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) +``` + +Use `updateStateByKey` to maintain a running count of each word seen in a text +data stream. Here, the running count is the state and it is an integer. We +define the update function as +``` +val updateFunc = (values: Seq[Int], state: Option[Int]) => { + val currentCount = values.foldLeft(0)(_ + _) + val previousCount = state.getOrElse(0) + Some(currentCount + previousCount) +} +``` + +This is applied on a DStream containing words (say, the pairs DStream containing +`(word, 3)` pairs in the earlier example +``` +val runningCounts = wordCounts.updateStateByKey[Int](updateFunction _) +``` + +Print a few of the counts to the console. Start the computation. +``` +runningCounts.print() +ssc.start() +ssc.awaitTermination() // Wait for the computation to terminate +``` + +#### Spark Streaming With GemFire +Now let's save the running word count to GemFire region `str_int_region`, which +simply replace print() with saveToGemfire(): + +``` +import io.pivotal.gemfire.spark.connector.streaming._ +runningCounts.saveToGemfire("str_int_region") +``` + +You can use the version of saveToGemfire that has the parameter `GemFireConnectionConf`: +``` +runningCounts.saveToGemfire("str_int_region", connConf) +``` + +See [Spark Streaming Programming Guide] +(http://spark.apache.org/docs/latest/streaming-programming-guide.html) for +more details about Sarpk streaming programming. + + +Next: [GemFire OQL](8_oql.md) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/8_oql.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/8_oql.md b/gemfire-spark-connector/doc/8_oql.md new file mode 100644 index 0000000..bad1f3c --- /dev/null +++ b/gemfire-spark-connector/doc/8_oql.md @@ -0,0 +1,60 @@ +## GemFire OQL Query +Spark GemFire Connector lets us run GemFire OQL queries in Spark applications +to retrieve data from GemFire. The query result is a Spark DataFrame. Note +that as of Spark 1.3, SchemaRDD is deprecated. Spark GemFire Connector does +not support SchemaRDD. + +An instance of `SQLContext` is required to run OQL query. +``` +val sqlContext = new org.apache.spark.sql.SQLContext(sc) +``` + +Create a `DataFrame` using OQL: +``` +val dataFrame = sqlContext.gemfireOQL("SELECT * FROM /CustomerRegion WHERE status = 'active'") +``` + +You can repartition the `DataFrame` using `DataFrame.repartition()` if needed. +Once you have the `DataFrame`, you can register it as a table and use Spark +SQL to query it: +``` +dataFrame.registerTempTable("customer") +val SQLResult = sqlContext.sql("SELECT * FROM customer WHERE id > 100") +``` + +##Serialization +If the OQL query involves User Defined Type (UDT), and the default Java +serializer is used, then the UDT on GemFire must implement `java.io.Serializable`. + +If KryoSerializer is preferred, as described in [Spark Documentation] +(https://spark.apache.org/docs/latest/tuning.html), you can configure +`SparkConf` as the following example: +``` +val conf = new SparkConf() + .setAppName("MySparkApp") + .setMaster("local[*]") + .set(GemFireLocatorPropKey, "localhost[55221]") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator") +``` + +and register the classes (optional) +``` +conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) +``` + +Use the following options to start Spark shell: +``` + --conf spark.serializer=org.apache.spark.serializer.KryoSerializer + --conf spark.kryo.registrator=io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator +``` + +## References +[GemFire OQL Documentation](http://gemfire.docs.pivotal.io/latest/userguide/index.html#developing/querying_basics/chapter_overview.html) + +[GemFire OQL Examples](http://gemfire.docs.pivotal.io/latest/userguide/index.html#getting_started/quickstart_examples/querying.html) + +[Spark SQL Documentation](https://spark.apache.org/docs/latest/sql-programming-guide.html) + + +Next: [Using Connector in Java](9_java_api.md) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/doc/9_java_api.md ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/doc/9_java_api.md b/gemfire-spark-connector/doc/9_java_api.md new file mode 100644 index 0000000..f13fb9b --- /dev/null +++ b/gemfire-spark-connector/doc/9_java_api.md @@ -0,0 +1,129 @@ +## Using Connector in Java +This section describes how to access the functionality of Spark GemFire +Connector when you write your Spark applications in Java. It is assumed +that you already familiarized yourself with the previous sections and +understand how the Spark GemFire Connector works. + +### Prerequisites +The best way to use the Spark GemFire Connector Java API is to statically +import all of the methods in `GemFireJavaUtil`. This utility class is +the main entry point for Spark GemFire Connector Java API. +``` +import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; +``` + +Create JavaSparkContext (don't forget about the static import): +``` +SparkConf conf = new SparkConf(); +conf.set(GemFireLocatorPropKey, "192.168.1.47[10334]") +JavaSparkContext jsc = new JavaSparkContext(conf); +``` + +### Accessing GemFire region in Java +GemFire region is exposed as `GemFireJavaRegionRDD`(subclass of +`JavaPairRDD`): +``` +GemFireJavaRegionRDD rdd1 = javaFunctions(jsc).gemfireRegion("emps") +GemFireJavaRegionRDD rdd2 = rdd1.where("value.getAge() < 40"); +``` + +### RDD Join and Outer Join +Use the `rdd3` and region `emps` from [join and outer join examples](5_rdd_join.md): +``` +static class MyKeyFunction implements Function, Integer> { + @Override public Interger call(Tuple2 pair) throws Exception { + return pair._2(); + } +} + +MyKeyFunction func = new MyKeyFunction(); + +JavaPairRDD, Emp> rdd3j = + javaFunction(rdd3).joinGemfireRegion("emps", func); + +JavaPairRDD, Option> rdd3o = + javaFunction(rdd3).outerJoinGemfireRegion("emps", func); + +``` + +### Saving JavaPairRDD to GemFire +Saving JavaPairRDD is straightforward: +``` +List> data = new ArrayList<>(); +data.add(new Tuple2<>("7", "seven")); +data.add(new Tuple2<>("8", "eight")); +data.add(new Tuple2<>("9", "nine")); + +// create JavaPairRDD +JavaPairRDD rdd1 = jsc.parallelizePairs(data); +// save to GemFire +javaFunctions(rdd1).saveToGemfire("str_str_region"); +``` + +In order to save `JavaRDD>`, it needs to be converted to +`JavaPairRDD` via static method `toJavaPairRDD` from `GemFireJavaUtil`: +``` +List> data2 = new ArrayList>(); +data2.add(new Tuple2<>("11", "eleven")); +data2.add(new Tuple2<>("12", "twelve")); +data2.add(new Tuple2<>("13", "thirteen")); + +// create JavaRDD> +JavaRDD> rdd2 = jsc.parallelize(data2); +// save to GemFire +javaFunctions(toJavaPairRDD(rdd2)).saveToGemfire("str_str_region"); +``` + +### Saving JavaRDD to GemFire +Similar to Scala version, a function is required to generate key/value pair +from RDD element. The following `PairFunction` generate a `` +pair from ``: +``` +PairFunction pairFunc = + new PairFunction() { + @Override public Tuple2 call(String s) throws Exception { + return new Tuple2(s, s.length()); + } + }; +``` +Note: there are 3 type parameters for PairFunction, they are: + 1. type of JavaRDD element + 2. type of key of output key/value pair + 3. type of value of output key/value pair + +Once `PairFunction` is ready, the rest is easy: +``` +// create demo JavaRDD +List data = new ArrayList(); +data.add("a"); +data.add("ab"); +data.add("abc"); +JavaRDD jrdd = sc.parallelize(data); + +javaFunctions(rdd).saveToGemfire("str_int_region", pairFunc); +``` + +### Saving JavaPairDStream and JavaDStream +Saving JavaPairDStream and JavaDStream is similar to saving JavaPairRDD +jand JavaRDD: +``` +JavaPairDStream ds1 = ... +javaFunctions(ds1).saveToGemFire("str_str_region"); + +JavaDStream ds2 = ... +javaFunctions(ds2).saveToGemFire("str_int_region", pairFunc); +``` + +### Using GemFire OQL + +There are two gemfireOQL Java APIs, with and without GemFireConnectionConf. +Here is an example without GemFireConnectionConf, it will use default +GemFireConnectionConf internally. +``` +// assume there's jsc: JavaSparkContext +SQLContext sqlContext = new org.apache.spark.sql.SQLContext(jsc); +DataFrame df = javaFunctions(sqlContext).gemfireOQL("select * from /str_str_region"); +df.show(); +``` + +Next: [About The Demos] (10_demos.md) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java new file mode 100644 index 0000000..55fe7e5 --- /dev/null +++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/RegionMetadata.java @@ -0,0 +1,77 @@ +package io.pivotal.gemfire.spark.connector.internal; + +import com.gemstone.gemfire.distributed.internal.ServerLocation; + +import java.util.HashMap; +import java.util.HashSet; +import java.io.Serializable; + +/** + * This class contains all info required by GemFire RDD partitioner to create partitions. + */ +public class RegionMetadata implements Serializable { + + private String regionPath; + private boolean isPartitioned; + private int totalBuckets; + private HashMap> serverBucketMap; + private String keyTypeName; + private String valueTypeName; + + /** + * Default constructor. + * @param regionPath the full path of the given region + * @param isPartitioned true for partitioned region, false otherwise + * @param totalBuckets number of total buckets for partitioned region, ignored otherwise + * @param serverBucketMap gemfire server (host:port pair) to bucket set map + * @param keyTypeName region key class name + * @param valueTypeName region value class name + */ + public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap> serverBucketMap, + String keyTypeName, String valueTypeName) { + this.regionPath = regionPath; + this.isPartitioned = isPartitioned; + this.totalBuckets = totalBuckets; + this.serverBucketMap = serverBucketMap; + this.keyTypeName = keyTypeName; + this.valueTypeName = valueTypeName; + } + + public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap> serverBucketMap) { + this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null); + } + + public String getRegionPath() { + return regionPath; + } + + public boolean isPartitioned() { + return isPartitioned; + } + + public int getTotalBuckets() { + return totalBuckets; + } + + public HashMap> getServerBucketMap() { + return serverBucketMap; + } + + public String getKeyTypeName() { + return keyTypeName; + } + + public String getValueTypeName() { + return valueTypeName; + } + + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("RegionMetadata(region=").append(regionPath) + .append("(").append(keyTypeName).append(", ").append(valueTypeName).append(")") + .append(", partitioned=").append(isPartitioned).append(", #buckets=").append(totalBuckets) + .append(", map=").append(serverBucketMap).append(")"); + return buf.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java new file mode 100644 index 0000000..0f4ef1d --- /dev/null +++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/QueryFunction.java @@ -0,0 +1,83 @@ +package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.execute.*; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.internal.HeapDataOutputStream; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext; +import com.gemstone.gemfire.internal.logging.LogService; +import org.apache.logging.log4j.Logger; +import java.util.Iterator; + +public class QueryFunction implements Function { + + private static final long serialVersionUID = 4866641340803692882L; + + public final static String ID = "gemfire-spark-query-function"; + + private final static QueryFunction instance = new QueryFunction(); + + private static final Logger logger = LogService.getLogger(); + + private static final int CHUNK_SIZE = 1024; + + @Override + public String getId() { + return ID; + } + + public static QueryFunction getInstance() { + return instance; + } + + @Override + public boolean optimizeForWrite() { + return true; + } + + @Override + public boolean isHA() { + return true; + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public void execute(FunctionContext context) { + try { + String[] args = (String[]) context.getArguments(); + String queryString = args[0]; + String bucketSet = args[1]; + InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context; + LocalRegion localRegion = (LocalRegion) irfc.getDataSet(); + boolean partitioned = localRegion.getDataPolicy().withPartitioning(); + Query query = CacheFactory.getAnyInstance().getQueryService().newQuery(queryString); + Object result = partitioned ? query.execute((InternalRegionFunctionContext) context) : query.execute(); + ResultSender sender = context.getResultSender(); + HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null); + Iterator iter = ((SelectResults) result).asList().iterator(); + while (iter.hasNext()) { + Object row = iter.next(); + DataSerializer.writeObject(row, buf); + if (buf.size() > CHUNK_SIZE) { + sender.sendResult(buf.toByteArray()); + logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " sendResult(), data size=" + buf.size()); + buf.reset(); + } + } + sender.lastResult(buf.toByteArray()); + logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " lastResult(), data size=" + buf.size()); + buf.reset(); + } + catch(Exception e) { + throw new FunctionException(e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java new file mode 100644 index 0000000..50c35c5 --- /dev/null +++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionFunction.java @@ -0,0 +1,192 @@ +package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions; + +import java.util.Iterator; +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.execute.FunctionException; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Struct; +import com.gemstone.gemfire.internal.cache.*; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.execute.Function; +import com.gemstone.gemfire.cache.execute.FunctionContext; +import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; +import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext; +import com.gemstone.gemfire.internal.cache.execute.InternalResultSender; +import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator; +import com.gemstone.gemfire.internal.logging.LogService; + +/** + * GemFire function that is used by `SparkContext.gemfireRegion(regionPath, whereClause)` + * to retrieve region data set for the given bucket set as a RDD partition + **/ +public class RetrieveRegionFunction implements Function { + + public final static String ID = "spark-gemfire-retrieve-region"; + private static final Logger logger = LogService.getLogger(); + private static final RetrieveRegionFunction instance = new RetrieveRegionFunction(); + + public RetrieveRegionFunction() { + } + + /** ------------------------------------------ */ + /** interface Function implementation */ + /** ------------------------------------------ */ + + public static RetrieveRegionFunction getInstance() { + return instance; + } + + @Override + public String getId() { + return ID; + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public boolean optimizeForWrite() { + return true; + } + + @Override + public boolean isHA() { + return true; + } + + @Override + public void execute(FunctionContext context) { + String[] args = (String[]) context.getArguments(); + String where = args[0]; + String taskDesc = args[1]; + InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context; + LocalRegion localRegion = (LocalRegion) irfc.getDataSet(); + boolean partitioned = localRegion.getDataPolicy().withPartitioning(); + if (where.trim().isEmpty()) + retrieveFullRegion(irfc, partitioned, taskDesc); + else + retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc); + } + + /** ------------------------------------------ */ + /** Retrieve region data with where clause */ + /** ------------------------------------------ */ + + private void retrieveRegionWithWhereClause( + InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) { + String regionPath = localRegion.getFullPath(); + String qstr = "select key, value from " + regionPath + ".entries where " + where; + logger.info(desc + ": " + qstr); + + try { + Cache cache = CacheFactory.getAnyInstance(); + QueryService queryService = cache.getQueryService(); + Query query = queryService.newQuery(qstr); + SelectResults results = + (SelectResults) (partitioned ? query.execute(context) : query.execute()); + + Iterator entries = getStructIteratorWrapper(results.asList().iterator()); + InternalResultSender irs = (InternalResultSender) context.getResultSender(); + StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc); + sender.send(); + } catch (Exception e) { + throw new FunctionException(e); + } + } + + private Iterator getStructIteratorWrapper(Iterator entries) { + return new WrapperIterator>(entries) { + @Override public Object[] next() { + return delegate.next().getFieldValues(); + } + }; + } + + /** ------------------------------------------ */ + /** Retrieve full region data */ + /** ------------------------------------------ */ + + private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) { + Iterator entries; + if (partitioned) { + PREntriesIterator iter = (PREntriesIterator) + ((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator(); + // entries = getPREntryIterator(iter); + entries = getSimpleEntryIterator(iter); + } else { + LocalRegion owner = (LocalRegion) context.getDataSet(); + Iterator iter = (Iterator) owner.entrySet().iterator(); + // entries = getRREntryIterator(iter, owner); + entries = getSimpleEntryIterator(iter); + } + InternalResultSender irs = (InternalResultSender) context.getResultSender(); + StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc); + sender.send(); + } + +// /** An iterator for partitioned region that uses internal API to get serialized value */ +// private Iterator getPREntryIterator(PREntriesIterator iterator) { +// return new WrapperIterator>(iterator) { +// @Override public Object[] next() { +// Region.Entry entry = delegate.next(); +// int bucketId = delegate.getBucketId(); +// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId); +// // owner needs to be the bucket region not the enclosing partition region +// LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId); +// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false); +// return new Object[] {keyInfo.getKey(), value}; +// } +// }; +// } +// +// /** An iterator for replicated region that uses internal API to get serialized value */ +// private Iterator getRREntryIterator(Iterator iterator, LocalRegion region) { +// final LocalRegion owner = region; +// return new WrapperIterator>(iterator) { +// @Override public Object[] next() { +// Region.Entry entry = delegate.next(); +// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null); +// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false); +// return new Object[] {keyInfo.getKey(), value}; +// } +// }; +// } + + // todo. compare performance of regular and simple iterator + /** An general iterator for both partitioned and replicated region that returns un-serialized value */ + private Iterator getSimpleEntryIterator(Iterator iterator) { + return new WrapperIterator>(iterator) { + @Override public Object[] next() { + Region.Entry entry = delegate.next(); + return new Object[] {entry.getKey(), entry.getValue()}; + } + }; + } + + /** ------------------------------------------ */ + /** abstract wrapper iterator */ + /** ------------------------------------------ */ + + /** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */ + abstract class WrapperIterator> implements Iterator { + + final S delegate; + + protected WrapperIterator(S delegate) { + this.delegate = delegate; + } + + @Override public boolean hasNext() { + return delegate.hasNext(); + } + + @Override public void remove() { } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java new file mode 100644 index 0000000..c11c231 --- /dev/null +++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/RetrieveRegionMetadataFunction.java @@ -0,0 +1,102 @@ +package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions; + +import com.gemstone.gemfire.cache.execute.Function; +import com.gemstone.gemfire.cache.execute.FunctionContext; +import com.gemstone.gemfire.cache.execute.ResultSender; +import com.gemstone.gemfire.distributed.internal.ServerLocation; +import com.gemstone.gemfire.internal.cache.BucketServerLocation66; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext; +import io.pivotal.gemfire.spark.connector.internal.RegionMetadata; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +/** + * This GemFire function retrieve region metadata + */ +public class RetrieveRegionMetadataFunction implements Function { + + public final static String ID = "gemfire-spark-retrieve-region-metadata"; + + private static final RetrieveRegionMetadataFunction instance = new RetrieveRegionMetadataFunction(); + + public RetrieveRegionMetadataFunction() { + } + + public static RetrieveRegionMetadataFunction getInstance() { + return instance; + } + + @Override + public String getId() { + return ID; + } + + @Override + public boolean optimizeForWrite() { + return false; + } + + @Override + public boolean isHA() { + return true; + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public void execute(FunctionContext context) { + LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) context).getDataSet(); + String regionPath = region.getFullPath(); + boolean isPartitioned = region.getDataPolicy().withPartitioning(); + String kTypeName = getTypeClassName(region.getAttributes().getKeyConstraint()); + String vTypeName = getTypeClassName(region.getAttributes().getValueConstraint()); + + RegionMetadata metadata; + if (! isPartitioned) { + metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, vTypeName); + } else { + PartitionedRegion pregion = (PartitionedRegion) region; + int totalBuckets = pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets(); + Map> bucketMap = pregion.getRegionAdvisor().getAllClientBucketProfiles(); + HashMap> serverMap = bucketServerMap2ServerBucketSetMap(bucketMap); + metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, kTypeName, vTypeName); + } + + ResultSender sender = context.getResultSender(); + sender.lastResult(metadata); + } + + private String getTypeClassName(Class clazz) { + return clazz == null ? null : clazz.getCanonicalName(); + } + + /** convert bucket to server map to server to bucket set map */ + private HashMap> + bucketServerMap2ServerBucketSetMap(Map> map) { + HashMap> serverBucketMap = new HashMap<>(); + for (Integer id : map.keySet()) { + List locations = map.get(id); + for (BucketServerLocation66 location : locations) { + ServerLocation server = new ServerLocation(location.getHostName(), location.getPort()); + if (location.isPrimary()) { + HashSet set = serverBucketMap.get(server); + if (set == null) { + set = new HashSet<>(); + serverBucketMap.put(server, set); + } + set.add(id); + break; + } + } + } + return serverBucketMap; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java new file mode 100644 index 0000000..f79c948 --- /dev/null +++ b/gemfire-spark-connector/gemfire-functions/src/main/java/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSender.java @@ -0,0 +1,203 @@ +package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.execute.ResultSender; +import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl; +import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl; +import com.gemstone.gemfire.cache.query.types.ObjectType; +import com.gemstone.gemfire.cache.query.types.StructType; +import com.gemstone.gemfire.internal.HeapDataOutputStream; +import com.gemstone.gemfire.internal.cache.CachedDeserializable; +import com.gemstone.gemfire.internal.logging.LogService; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.Iterator; + +/** + * StructStreamingResultSender and StructStreamingResultCollector are paired + * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct` + * from GemFire server to Spark Connector (the client of GemFire server) + * in streaming, i.e., while sender sending the result, the collector can + * start processing the arrived result without waiting for full result to + * become available. + */ +public class StructStreamingResultSender { + + public static final byte TYPE_CHUNK = 0x30; + public static final byte DATA_CHUNK = 0x31; + public static final byte ERROR_CHUNK = 0x32; + public static final byte SER_DATA = 0x41; + public static final byte UNSER_DATA = 0x42; + public static final byte BYTEARR_DATA = 0x43; + + private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class); + public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField}); + + private static final Logger logger = LogService.getLogger(); + private static final int CHUNK_SIZE = 4096; + + // Note: The type of ResultSender returned from GemFire FunctionContext is + // always ResultSender, so can't use ResultSender here + private final ResultSender sender; + private final StructType structType; + private final Iterator rows; + private String desc; + private boolean closed = false; + + /** + * the Constructor + * @param sender the base ResultSender that send data in byte array + * @param type the StructType of result record + * @param rows the iterator of the collection of results + * @param desc description of this result (used for logging) + */ + public StructStreamingResultSender( + ResultSender sender, StructType type, Iterator rows, String desc) { + if (sender == null || rows == null) + throw new NullPointerException("sender=" + sender + ", rows=" + rows); + this.sender = sender; + this.structType = type; + this.rows = rows; + this.desc = desc; + } + + /** the Constructor with default `desc` */ + public StructStreamingResultSender( + ResultSender sender, StructType type, Iterator rows) { + this(sender, type, rows, "StructStreamingResultSender"); + } + + /** + * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR. + * TYPE chunk for sending struct type info, DATA chunk for sending data, and + * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted + * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually + * there are multiple DATA chunks. Each DATA chunk contains multiple rows + * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an + * exception is thrown, it is serialized and sent as the last chunk of the + * result (in the form of ERROR chunk). + */ + public void send() { + if (closed) throw new RuntimeException("sender is closed."); + + HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null); + String dataType = null; + int typeSize = 0; + int rowCount = 0; + int dataSize = 0; + try { + if (rows.hasNext()) { + // Note: only send type info if there's data with it + typeSize = sendType(buf); + buf.writeByte(DATA_CHUNK); + int rowSize = structType == null ? 2 : structType.getFieldNames().length; + while (rows.hasNext()) { + rowCount ++; + Object[] row = rows.next(); + if (rowCount < 2) dataType = entryDataType(row); + if (rowSize != row.length) + throw new IOException(rowToString("Expect " + rowSize + " columns, but got ", row)); + serializeRowToBuffer(row, buf); + if (buf.size() > CHUNK_SIZE) { + dataSize += sendBufferredData(buf, false); + buf.writeByte(DATA_CHUNK); + } + } + } + // send last piece of data or empty byte array + dataSize += sendBufferredData(buf, true); + logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" + + typeSize + ", data.size=" + dataSize + ", row.avg.size=" + + (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount))); + } catch (IOException | RuntimeException e) { + sendException(buf, e); + } finally { + closed = true; + } + } + + private String rowToString(String rowDesc, Object[] row) { + StringBuilder buf = new StringBuilder(); + buf.append(rowDesc).append("("); + for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]); + return buf.append(")") .toString(); + } + + private String entryDataType(Object[] row) { + StringBuilder buf = new StringBuilder(); + buf.append("("); + for (int i = 0; i < row.length; i++) { + if (i != 0) buf.append(", "); + buf.append(row[i].getClass().getCanonicalName()); + } + return buf.append(")").toString(); + } + + private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException { + for (Object data : row) { + if (data instanceof CachedDeserializable) { + buf.writeByte(SER_DATA); + DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf); + } else if (data instanceof byte[]) { + buf.writeByte(BYTEARR_DATA); + DataSerializer.writeByteArray((byte[]) data, buf); + } else { + buf.writeByte(UNSER_DATA); + DataSerializer.writeObject(data, buf); + } + } + } + + /** return the size of type data */ + private int sendType(HeapDataOutputStream buf) throws IOException { + // logger.info(desc + " struct type: " + structType); + if (structType != null) { + buf.writeByte(TYPE_CHUNK); + DataSerializer.writeObject(structType, buf); + return sendBufferredData(buf, false); + } else { + return 0; // default KeyValue type, no type info send + } + } + + private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException { + if (isLast) sender.lastResult(buf.toByteArray()); + else sender.sendResult(buf.toByteArray()); + // logData(buf.toByteArray(), desc); + int s = buf.size(); + buf.reset(); + return s; + } + + /** Send the exception as the last chunk of the result. */ + private void sendException(HeapDataOutputStream buf, Exception e) { + // Note: if exception happens during the serialization, the `buf` may contain + // partial serialized data, which may cause de-serialization hang or error. + // Therefore, always empty the buffer before sending the exception + if (buf.size() > 0) buf.reset(); + + try { + buf.writeByte(ERROR_CHUNK); + DataSerializer.writeObject(e, buf); + } catch (IOException ioe) { + logger.error("StructStreamingResultSender failed to send the result:", e); + logger.error("StructStreamingResultSender failed to serialize the exception:", ioe); + buf.reset(); + } + // Note: send empty chunk as the last result if serialization of exception + // failed, and the error is logged on the GemFire server side. + sender.lastResult(buf.toByteArray()); + // logData(buf.toByteArray(), desc); + } + +// private void logData(byte[] data, String desc) { +// StringBuilder buf = new StringBuilder(); +// buf.append(desc); +// for (byte b : data) { +// buf.append(" ").append(b); +// } +// logger.info(buf.toString()); +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java b/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java new file mode 100644 index 0000000..ed7cef0 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Employee.java @@ -0,0 +1,38 @@ +package ittest.io.pivotal.gemfire.spark.connector; + +import java.io.Serializable; + +public class Employee implements Serializable { + + private String name; + + private int age; + + public Employee(String n, int a) { + name = n; + age = a; + } + + public String getName() { + return name; + } + + public int getAge() { + return age; + } + + public String toString() { + return new StringBuilder().append("Employee[name=").append(name). + append(", age=").append(age). + append("]").toString(); + } + + public boolean equals(Object o) { + if (o instanceof Employee) { + return ((Employee) o).name.equals(name) && ((Employee) o).age == age; + } + return false; + } + +} +