geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasonhu...@apache.org
Subject [06/11] incubator-geode git commit: GEODE-1244: Package, directory, project and file rename for geode-spark-connector
Date Wed, 20 Apr 2016 22:51:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala
deleted file mode 100644
index 48f83c9..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.gemfire.spark.connector.rdd
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
-import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDPartition, GemFireRegionRDD}
-import io.pivotal.gemfire.spark.connector.{GemFireConnectionConf, GemFireConnection}
-import org.apache.spark.{TaskContext, Partition, SparkContext}
-import org.mockito.Mockito._
-import org.mockito.Matchers.{eq => mockEq, any => mockAny}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-
-import scala.reflect.ClassTag
-
-class GemFireRegionRDDTest extends FunSuite with Matchers with MockitoSugar {
-
-  /** create common mocks, not all mocks are used by all tests */
-  def createMocks[K, V](regionPath: String)(implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]])
-    : (String, Region[K,V], GemFireConnectionConf, GemFireConnection) = {
-    val mockConnection = mock[GemFireConnection]
-    val mockRegion = mock[Region[K, V]]
-    val mockConnConf = mock[GemFireConnectionConf]
-    when(mockConnConf.getConnection).thenReturn(mockConnection)
-    when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion)
-    when(mockConnConf.locators).thenReturn(Seq.empty)
-    (regionPath, mockRegion, mockConnConf, mockConnection)
-  }
-  
-  test("create GemFireRDD with non-existing region") {
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
-    when(mockConnConf.getConnection).thenReturn(mockConnection)
-    when(mockConnection.validateRegion[String,String](regionPath)).thenThrow(new RuntimeException)
-    val mockSparkContext = mock[SparkContext]
-    intercept[RuntimeException] { GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf) }
-    verify(mockConnConf).getConnection
-    verify(mockConnection).validateRegion[String, String](regionPath)
-  }
-  
-  test("getPartitions with non-existing region") {
-    // region exists when RDD is created, but get removed before getPartitions() is invoked
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
-    when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(None)
-    val mockSparkContext = mock[SparkContext]
-    intercept[RuntimeException] { GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf).getPartitions }
-  }
-
-  test("getPartitions with replicated region and not preferred env") {
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
-    implicit val mockConnConf2 = mockConnConf
-    val mockSparkContext = mock[SparkContext]
-    when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null)))
-    val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions
-    verifySinglePartition(partitions)
-  }
-
-  def verifySinglePartition(partitions: Array[Partition]): Unit = {
-    assert(1 == partitions.size)
-    assert(partitions(0).index === 0)
-    assert(partitions(0).isInstanceOf[GemFireRDDPartition])
-    assert(partitions(0).asInstanceOf[GemFireRDDPartition].bucketSet.isEmpty)
-  }
-
-  test("getPartitions with replicated region and preferred OnePartitionPartitioner") {
-    // since it's replicated region, so OnePartitionPartitioner will be used, i.e., override preferred partitioner
-    import io.pivotal.gemfire.spark.connector.{PreferredPartitionerPropKey, OnePartitionPartitionerName}
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
-    when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null)))
-    implicit val mockConnConf2 = mockConnConf
-    val mockSparkContext = mock[SparkContext]
-    val env = Map(PreferredPartitionerPropKey -> OnePartitionPartitionerName)
-    val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf, env).partitions
-    verifySinglePartition(partitions)
-  }
-
-  test("getPartitions with partitioned region and not preferred env") {
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
-    implicit val mockConnConf2 = mockConnConf
-    val mockSparkContext = mock[SparkContext]
-    when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null)))
-    val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions
-    verifySinglePartition(partitions)
-  }
-
-  test("GemFireRDD.compute() method") {
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test")
-    implicit val mockConnConf2 = mockConnConf
-    val mockIter = mock[Iterator[(String, String)]]
-    val partition = GemFireRDDPartition(0, Set.empty)
-    when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null)))
-    when(mockConnection.getRegionData[String, String](regionPath, None, partition)).thenReturn(mockIter)
-    val mockSparkContext = mock[SparkContext]
-    val rdd = GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf)
-    val partitions = rdd.partitions
-    assert(1 == partitions.size)
-    val mockTaskContext = mock[TaskContext]
-    rdd.compute(partitions(0), mockTaskContext)        
-    verify(mockConnection).getRegionData[String, String](mockEq(regionPath), mockEq(None), mockEq(partition))
-    // verify(mockConnection).getRegionData[String, String](regionPath, Set.empty.asInstanceOf[Set[Int]], "gemfireRDD 0.0")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java
deleted file mode 100644
index 03e15a0..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import java.io.Serializable;
-
-/**
- * This is a demo class used in doc/?.md
- */
-public class Emp implements Serializable {
-
-  private int id;
-  
-  private String lname;
-
-  private String fname;
-
-  private int age;
-
-  private String loc;
-
-  public Emp(int id, String lname, String fname, int age, String loc) {
-    this.id = id;
-    this.lname = lname;
-    this.fname = fname;
-    this.age = age;
-    this.loc = loc;
-  }
-
-  public int getId() {
-    return id;
-  }
-
-  public String getLname() {
-    return lname;
-  }
-
-  public String getFname() {
-    return fname;
-  }
-
-  public int getAge() {
-    return age;
-  }
-
-  public String getLoc() {
-    return loc;
-  }
-
-  @Override
-  public String toString() {
-    return "Emp(" + id + ", " + lname + ", " + fname + ", " + age + ", " + loc + ")";
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    Emp emp = (Emp) o;
-
-    if (age != emp.age) return false;
-    if (id != emp.id) return false;
-    if (fname != null ? !fname.equals(emp.fname) : emp.fname != null) return false;
-    if (lname != null ? !lname.equals(emp.lname) : emp.lname != null) return false;
-    if (loc != null ? !loc.equals(emp.loc) : emp.loc != null) return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = id;
-    result = 31 * result + (lname != null ? lname.hashCode() : 0);
-    result = 31 * result + (fname != null ? fname.hashCode() : 0);
-    result = 31 * result + age;
-    result = 31 * result + (loc != null ? loc.hashCode() : 0);
-    return result;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java
deleted file mode 100644
index 41654a5..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
-
-
-/**
- * This Spark application demonstrates how to get region data from GemFire using GemFire
- * OQL Java API. The result is a Spark DataFrame.
- * <p>
- * In order to run it, you will need to start a GemFire cluster, and run demo PairRDDSaveJavaDemo
- * first to create some data in the region.
- * <p>
- * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
- * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. 
- * Then run the following command to start a Spark job:
- * <pre>
- *   <path to spark>/bin/spark-submit --master=local[2] --class demo.OQLJavaDemo \
- *       <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
- * </pre>
- */
-public class OQLJavaDemo {
-
-  public static void main(String[] argv) {
-
-    if (argv.length != 1) {
-      System.err.printf("Usage: OQLJavaDemo <locators>\n");
-      return;
-    }
-
-    SparkConf conf = new SparkConf().setAppName("OQLJavaDemo");
-    conf.set(GemFireLocatorPropKey, argv[0]); // "192.168.1.47[10335]"
-    JavaSparkContext sc = new JavaSparkContext(conf);
-    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
-    DataFrame df = javaFunctions(sqlContext).gemfireOQL("select * from /str_str_region");
-    System.out.println("======= DataFrame =======\n");
-    df.show();
-    sc.stop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java
deleted file mode 100644
index 84f87af..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
-import java.util.*;
-
-import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
-
-/**
- * This Spark application demonstrates how to save a RDD to GemFire using GemFire Spark 
- * Connector with Java.
- * <p/>
- * In order to run it, you will need to start GemFire cluster, and create the following region
- * with GFSH:
- * <pre>
- * gfsh> create region --name=str_str_region --type=REPLICATE \
- *         --key-constraint=java.lang.String --value-constraint=java.lang.String
- * </pre>
- * 
- * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
- * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. 
- * Then run the following command to start a Spark job:
- * <pre>
- *   <path to spark>/bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo \
- *       <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
- * </pre>
- * 
- * Verify the data was saved to GemFire with GFSH:
- * <pre>gfsh> query --query="select * from /str_str_region.entrySet"  </pre>
- */
-public class PairRDDSaveJavaDemo {
-
-  public static void main(String[] argv) {
-
-    if (argv.length != 1) {
-      System.err.printf("Usage: PairRDDSaveJavaDemo <locators>\n");
-      return;
-    }
-
-    SparkConf conf = new SparkConf().setAppName("PairRDDSaveJavaDemo");
-    conf.set(GemFireLocatorPropKey, argv[0]);
-    JavaSparkContext sc = new JavaSparkContext(conf);
-    GemFireConnectionConf connConf = GemFireConnectionConf.apply(conf);
-
-    List<Tuple2<String, String>> data = new ArrayList<>();
-    data.add(new Tuple2<>("7", "seven"));
-    data.add(new Tuple2<>("8", "eight"));
-    data.add(new Tuple2<>("9", "nine"));
-
-    List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
-    data2.add(new Tuple2<>("11", "eleven"));
-    data2.add(new Tuple2<>("12", "twelve"));
-    data2.add(new Tuple2<>("13", "thirteen"));
-
-    // method 1: generate JavaPairRDD directly
-    JavaPairRDD<String, String> rdd1 =  sc.parallelizePairs(data);
-    javaFunctions(rdd1).saveToGemfire("str_str_region", connConf);
-
-    // method 2: convert JavaRDD<Tuple2<K,V>> to JavaPairRDD<K, V>
-    JavaRDD<Tuple2<String, String>> rdd2 =  sc.parallelize(data2);
-    javaFunctions(toJavaPairRDD(rdd2)).saveToGemfire("str_str_region", connConf);
-       
-    sc.stop();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java
deleted file mode 100644
index 5fc5aeb..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
-
-/**
- * This Spark application demonstrates how to save a RDD to GemFire using GemFire Spark
- * Connector with Java.
- * <p/>
- * In order to run it, you will need to start GemFire cluster, and create the following region
- * with GFSH:
- * <pre>
- * gfsh> create region --name=str_int_region --type=REPLICATE \
- *         --key-constraint=java.lang.String --value-constraint=java.lang.Integer
- * </pre>
- *
- * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
- * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/.
- * Then run the following command to start a Spark job:
- * <pre>
- *   <path to spark>/bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo \
- *       <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
- * </pre>
- *
- * Verify the data was saved to GemFire with GFSH:
- * <pre>gfsh> query --query="select * from /str_int_region.entrySet"  </pre>
- */
-public class RDDSaveJavaDemo {
-
-  public static void main(String[] argv) {
-
-    if (argv.length != 1) {
-      System.err.printf("Usage: RDDSaveJavaDemo <locators>\n");
-      return;
-    }
-
-    SparkConf conf = new SparkConf().setAppName("RDDSaveJavaDemo");
-    conf.set(GemFireLocatorPropKey, argv[0]);
-    JavaSparkContext sc = new JavaSparkContext(conf);
-
-    List<String> data = new ArrayList<String>();
-    data.add("abcdefg");
-    data.add("abcdefgh");
-    data.add("abcdefghi");
-    JavaRDD<String> rdd =  sc.parallelize(data);
-
-    GemFireConnectionConf connConf = GemFireConnectionConf.apply(conf);
-
-    PairFunction<String, String, Integer> func =  new PairFunction<String, String, Integer>() {
-      @Override public Tuple2<String, Integer> call(String s) throws Exception {
-        return new Tuple2<String, Integer>(s, s.length());
-      }
-    };
-
-    javaFunctions(rdd).saveToGemfire("str_int_region", func, connConf);
-
-    sc.stop();
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java
deleted file mode 100644
index 7c1d7bb..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
-
-/**
- * This Spark application demonstrates how to expose a region in GemFire as a RDD using GemFire
- * Spark Connector with Java.
- * <p>
- * In order to run it, you will need to start GemFire cluster, and run demo PairRDDSaveJavaDemo
- * first to create some data in the region.
- * <p>
- * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar
- * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. 
- * Then run the following command to start a Spark job:
- * <pre>
- *   <path to spark>/bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo \
- *       <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port>
- * </pre>
- */
-public class RegionToRDDJavaDemo {
-
-  public static void main(String[] argv) {
-
-    if (argv.length != 1) {
-      System.err.printf("Usage: RegionToRDDJavaDemo <locators>\n");
-      return;
-    }
-    
-    SparkConf conf = new SparkConf().setAppName("RegionToRDDJavaDemo"); 
-    conf.set(GemFireLocatorPropKey, argv[0]);
-    JavaSparkContext sc = new JavaSparkContext(conf);
-
-    JavaPairRDD<String, String> rdd = javaFunctions(sc).gemfireRegion("str_str_region");
-    System.out.println("=== gemfireRegion =======\n" + rdd.collect() + "\n=========================");
-    
-    sc.stop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala
deleted file mode 100644
index f67c32e..0000000
--- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package demo
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import io.pivotal.gemfire.spark.connector.GemFireLocatorPropKey
-import io.pivotal.gemfire.spark.connector.streaming._
-
-/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * <p><p>
- * In order to run it, you will need to start GemFire cluster, and create the following region
- * with GFSH:
- * <pre>
- * gfsh> create region --name=str_int_region --type=REPLICATE \
- *         --key-constraint=java.lang.String --value-constraint=java.lang.Integer
- * </pre> 
- *
- * <p>To run this on your local machine, you need to first run a net cat server
- *    `$ nc -lk 9999`
- * and then run the example
- *    `$ bin/spark-submit --master=local[2] --class demo.NetworkWordCount <path to>/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port`
- * 
- * <p><p> check result that was saved to GemFire with GFSH:
- * <pre>gfsh> query --query="select * from /str_int_region.entrySet"  </pre>
- */
-object NetworkWordCount {
-  
-  def main(args: Array[String]) {
-    if (args.length < 3) {
-      System.err.println("Usage: NetworkWordCount <hostname> <port> <gemfire locator>")
-      System.exit(1)
-    }
-
-    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
-      val currentCount = values.foldLeft(0)(_ + _)
-      val previousCount = state.getOrElse(0)
-      Some(currentCount + previousCount)
-    }
-    
-    // Create the context with a 1 second batch size
-    val sparkConf = new SparkConf().setAppName("NetworkWordCount").set(GemFireLocatorPropKey, args(2))
-    val ssc = new StreamingContext(sparkConf, Seconds(1))
-    ssc.checkpoint(".")
-    
-    // Create a socket stream on target ip:port and count the
-    // words in input stream of \n delimited text (eg. generated by 'nc')
-    // Note that no duplication in storage level only for running locally.
-    // Replication necessary in distributed scenario for fault tolerance.
-    val lines = ssc.socketTextStream(args(0), args(1).toInt)
-    val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-    val runningCounts = wordCounts.updateStateByKey[Int](updateFunc)
-    // runningCounts.print()
-    runningCounts.saveToGemfire("str_int_region")
-    ssc.start()
-    ssc.awaitTermination()
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
new file mode 100644
index 0000000..adee52f
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.io.Serializable;
+
+/**
+ * This class contains all info required by GemFire RDD partitioner to create partitions.
+ */
+public class RegionMetadata implements Serializable {
+
+  private String  regionPath;
+  private boolean isPartitioned;
+  private int     totalBuckets;
+  private HashMap<ServerLocation, HashSet<Integer>> serverBucketMap;
+  private String  keyTypeName;
+  private String  valueTypeName;
+
+  /**
+   * Default constructor.
+   * @param regionPath the full path of the given region
+   * @param isPartitioned true for partitioned region, false otherwise
+   * @param totalBuckets number of total buckets for partitioned region, ignored otherwise
+   * @param serverBucketMap geode server (host:port pair) to bucket set map
+   * @param keyTypeName region key class name
+   * @param valueTypeName region value class name                    
+   */
+  public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap,
+                        String keyTypeName, String valueTypeName) {
+    this.regionPath = regionPath;
+    this.isPartitioned = isPartitioned;
+    this.totalBuckets = totalBuckets;
+    this.serverBucketMap = serverBucketMap;
+    this.keyTypeName = keyTypeName;
+    this.valueTypeName = valueTypeName;
+  }
+
+  public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap) {
+    this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null);
+  }
+
+  public String getRegionPath() {
+    return regionPath;
+  }
+
+  public boolean isPartitioned() {
+    return isPartitioned;
+  }
+
+  public int getTotalBuckets() {
+    return totalBuckets;
+  }
+  
+  public HashMap<ServerLocation, HashSet<Integer>> getServerBucketMap() {
+    return serverBucketMap;
+  }
+
+  public String getKeyTypeName() {
+    return keyTypeName;
+  }
+
+  public String getValueTypeName() {
+    return valueTypeName;
+  }
+
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("RegionMetadata(region=").append(regionPath)
+       .append("(").append(keyTypeName).append(", ").append(valueTypeName).append(")")
+       .append(", partitioned=").append(isPartitioned).append(", #buckets=").append(totalBuckets)
+       .append(", map=").append(serverBucketMap).append(")");
+    return buf.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
new file mode 100644
index 0000000..e6800f1
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.execute.*;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+import com.gemstone.gemfire.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+import java.util.Iterator;
+
+public class QueryFunction implements Function {
+
+  private static final long serialVersionUID = 4866641340803692882L;
+
+  public final static String ID = "geode-spark-query-function";
+
+  private final static QueryFunction instance = new QueryFunction();
+
+  private static final Logger logger = LogService.getLogger();
+
+  private static final int CHUNK_SIZE = 1024;
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  public static QueryFunction getInstance() {
+    return instance;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    try {
+      String[] args = (String[]) context.getArguments();
+      String queryString = args[0];
+      String bucketSet = args[1];
+      InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
+      LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+      boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+      Query query = CacheFactory.getAnyInstance().getQueryService().newQuery(queryString);
+      Object result = partitioned ? query.execute((InternalRegionFunctionContext) context) : query.execute();
+      ResultSender<Object> sender = context.getResultSender();
+      HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null);
+      Iterator<Object> iter = ((SelectResults) result).asList().iterator();
+      while (iter.hasNext()) {
+        Object row = iter.next();
+        DataSerializer.writeObject(row, buf);
+        if (buf.size() > CHUNK_SIZE) {
+          sender.sendResult(buf.toByteArray());
+          logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " sendResult(), data size=" + buf.size());
+          buf.reset();
+        }
+      }
+      sender.lastResult(buf.toByteArray());
+      logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " lastResult(), data size=" + buf.size());
+      buf.reset();
+    }
+    catch(Exception e) {
+      throw new FunctionException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
new file mode 100644
index 0000000..6338aa5
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import java.util.Iterator;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.internal.cache.*;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+import com.gemstone.gemfire.internal.cache.execute.InternalResultSender;
+import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * GemFire function that is used by `SparkContext.geodeRegion(regionPath, whereClause)`
+ * to retrieve region data set for the given bucket set as a RDD partition 
+ **/
+public class RetrieveRegionFunction implements Function {
+
+  public final static String ID = "spark-geode-retrieve-region";
+  private static final Logger logger = LogService.getLogger();
+  private static final RetrieveRegionFunction instance = new RetrieveRegionFunction();
+
+  public RetrieveRegionFunction() {
+  }
+
+  /** ------------------------------------------ */
+  /**     interface Function implementation      */
+  /** ------------------------------------------ */
+
+  public static RetrieveRegionFunction getInstance() {
+    return instance;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    String[] args = (String[]) context.getArguments();
+    String where = args[0];
+    String taskDesc = args[1];
+    InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
+    LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+    boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+    if (where.trim().isEmpty())
+      retrieveFullRegion(irfc, partitioned, taskDesc);
+    else
+      retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc);
+  }
+
+  /** ------------------------------------------ */
+  /**    Retrieve region data with where clause  */
+  /** ------------------------------------------ */
+
+  private void retrieveRegionWithWhereClause(
+    InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) {
+    String regionPath = localRegion.getFullPath();
+    String qstr = "select key, value from " + regionPath + ".entries where " + where;
+    logger.info(desc + ": " + qstr);
+
+    try {
+      Cache cache = CacheFactory.getAnyInstance();
+      QueryService queryService = cache.getQueryService();
+      Query query = queryService.newQuery(qstr);
+      SelectResults<Struct> results =
+        (SelectResults<Struct>) (partitioned ?  query.execute(context) : query.execute());
+
+      Iterator<Object[]> entries = getStructIteratorWrapper(results.asList().iterator());
+      InternalResultSender irs = (InternalResultSender) context.getResultSender();
+      StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
+      sender.send();
+    } catch (Exception e) {
+      throw new FunctionException(e);
+    }
+  }
+
+  private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> entries) {
+    return new WrapperIterator<Struct, Iterator<Struct>>(entries) {
+      @Override public Object[] next() {
+        return  delegate.next().getFieldValues();
+      }
+    };
+  }
+
+  /** ------------------------------------------ */
+  /**         Retrieve full region data          */
+  /** ------------------------------------------ */
+
+  private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) {
+    Iterator<Object[]> entries;
+    if (partitioned) {
+      PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>)
+              ((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator();
+      // entries = getPREntryIterator(iter);
+      entries = getSimpleEntryIterator(iter);
+    } else {
+      LocalRegion owner = (LocalRegion) context.getDataSet();
+      Iterator<Region.Entry> iter = (Iterator<Region.Entry>) owner.entrySet().iterator();
+      // entries = getRREntryIterator(iter, owner);
+      entries = getSimpleEntryIterator(iter);
+    }
+    InternalResultSender irs = (InternalResultSender) context.getResultSender();
+    StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
+    sender.send();
+  }
+
+//  /** An iterator for partitioned region that uses internal API to get serialized value */
+//  private Iterator<Object[]> getPREntryIterator(PREntriesIterator<Region.Entry> iterator) {
+//    return new WrapperIterator<Region.Entry, PREntriesIterator<Region.Entry>>(iterator) {
+//      @Override public Object[] next() {
+//        Region.Entry entry = delegate.next();
+//        int bucketId = delegate.getBucketId();
+//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId);
+//        // owner needs to be the bucket region not the enclosing partition region
+//        LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId);
+//        Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
+//        return new Object[] {keyInfo.getKey(), value};
+//      }
+//    };
+//  }
+//
+//  /** An iterator for replicated region that uses internal API to get serialized value */
+//  private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> iterator, LocalRegion region) {
+//    final LocalRegion owner = region;
+//    return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
+//      @Override public Object[] next() {
+//        Region.Entry entry =  delegate.next();
+//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null);
+//        Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
+//        return new Object[] {keyInfo.getKey(), value};
+//      }
+//    };
+//  }
+
+  // todo. compare performance of regular and simple iterator
+  /** An general iterator for both partitioned and replicated region that returns un-serialized value */
+  private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> iterator) {
+    return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
+      @Override public Object[] next() {
+        Region.Entry entry = delegate.next();
+        return new Object[] {entry.getKey(), entry.getValue()};
+      }
+    };
+  }
+
+  /** ------------------------------------------ */
+  /**        abstract wrapper iterator           */
+  /** ------------------------------------------ */
+
+  /** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */
+  abstract class WrapperIterator<T, S extends Iterator<T>> implements Iterator<Object[]> {
+
+    final S delegate;
+
+    protected WrapperIterator(S delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override public boolean hasNext() {
+      return delegate.hasNext();
+    }
+
+    @Override public void remove() { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
new file mode 100644
index 0000000..ce4dac3
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
+import io.pivotal.geode.spark.connector.internal.RegionMetadata;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This GemFire function retrieve region metadata
+ */
+public class RetrieveRegionMetadataFunction implements Function {
+
+  public final static String ID = "geode-spark-retrieve-region-metadata";
+  
+  private static final RetrieveRegionMetadataFunction instance = new RetrieveRegionMetadataFunction();
+
+  public RetrieveRegionMetadataFunction() {
+  }
+
+  public static RetrieveRegionMetadataFunction getInstance() {
+    return instance;    
+  }
+  
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) context).getDataSet();
+    String regionPath = region.getFullPath();
+    boolean isPartitioned = region.getDataPolicy().withPartitioning();
+    String kTypeName = getTypeClassName(region.getAttributes().getKeyConstraint());
+    String vTypeName = getTypeClassName(region.getAttributes().getValueConstraint());
+
+    RegionMetadata metadata;
+    if (! isPartitioned) {
+      metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, vTypeName);
+    } else {
+      PartitionedRegion pregion = (PartitionedRegion) region;
+      int totalBuckets = pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets();
+      Map<Integer, List<BucketServerLocation66>> bucketMap = pregion.getRegionAdvisor().getAllClientBucketProfiles();
+      HashMap<ServerLocation, HashSet<Integer>> serverMap = bucketServerMap2ServerBucketSetMap(bucketMap);
+      metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, kTypeName, vTypeName);
+    }
+    
+    ResultSender<RegionMetadata> sender = context.getResultSender();
+    sender.lastResult(metadata);
+  }
+  
+  private String getTypeClassName(Class clazz) {
+    return clazz == null ? null : clazz.getCanonicalName();
+  }
+  
+  /** convert bucket to server map to server to bucket set map */
+  private HashMap<ServerLocation, HashSet<Integer>>
+    bucketServerMap2ServerBucketSetMap(Map<Integer, List<BucketServerLocation66>> map) {
+    HashMap<ServerLocation, HashSet<Integer>> serverBucketMap = new HashMap<>();
+    for (Integer id : map.keySet()) {
+      List<BucketServerLocation66> locations = map.get(id);
+      for (BucketServerLocation66 location : locations) {
+        ServerLocation server = new ServerLocation(location.getHostName(), location.getPort());
+        if (location.isPrimary()) {
+          HashSet<Integer> set = serverBucketMap.get(server);
+          if (set == null) {
+            set = new HashSet<>();
+            serverBucketMap.put(server, set);
+          }
+          set.add(id);
+          break;
+        }
+      }
+    }
+    return serverBucketMap;    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
new file mode 100644
index 0000000..853582d
--- /dev/null
+++ b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
+import com.gemstone.gemfire.cache.query.types.ObjectType;
+import com.gemstone.gemfire.cache.query.types.StructType;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * StructStreamingResultSender and StructStreamingResultCollector  are paired
+ * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct`
+ * from GemFire server to Spark Connector (the client of GemFire server)
+ * in streaming, i.e., while sender sending the result, the collector can
+ * start processing the arrived result without waiting for full result to
+ * become available.
+ */
+public class StructStreamingResultSender {
+
+  public static final byte TYPE_CHUNK   = 0x30;
+  public static final byte DATA_CHUNK   = 0x31;
+  public static final byte ERROR_CHUNK  = 0x32;
+  public static final byte SER_DATA     = 0x41;
+  public static final byte UNSER_DATA   = 0x42;
+  public static final byte BYTEARR_DATA = 0x43;
+
+  private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class);
+  public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField});
+
+  private static final Logger logger = LogService.getLogger();
+  private static final int CHUNK_SIZE = 4096;
+  
+  // Note: The type of ResultSender returned from GemFire FunctionContext is
+  //  always ResultSender<Object>, so can't use ResultSender<byte[]> here
+  private final ResultSender<Object> sender;
+  private final StructType structType;
+  private final Iterator<Object[]> rows;
+  private String desc;
+  private boolean closed = false;
+
+  /**
+   * the Constructor 
+   * @param sender the base ResultSender that send data in byte array
+   * @param type the StructType of result record
+   * @param rows the iterator of the collection of results
+   * @param desc description of this result (used for logging)           
+   */
+  public StructStreamingResultSender(
+    ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, String desc) {
+    if (sender == null || rows == null)
+      throw new NullPointerException("sender=" + sender + ", rows=" + rows);
+    this.sender = sender;
+    this.structType = type;
+    this.rows = rows;
+    this.desc = desc;
+  }
+
+  /** the Constructor with default `desc` */
+  public StructStreamingResultSender(
+          ResultSender<Object> sender, StructType type, Iterator<Object[]> rows) {
+    this(sender, type, rows, "StructStreamingResultSender");
+  }
+  
+  /**
+   * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR.
+   * TYPE chunk for sending struct type info, DATA chunk for sending data, and
+   * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted
+   * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually
+   * there are multiple DATA chunks. Each DATA chunk contains multiple rows
+   * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an
+   * exception is thrown, it is serialized and sent as the last chunk  of the 
+   * result (in the form of ERROR chunk).
+   */
+  public void send() {
+    if (closed) throw new RuntimeException("sender is closed.");
+
+    HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null);
+    String dataType = null;
+    int typeSize = 0;
+    int rowCount = 0;
+    int dataSize = 0;            
+    try {
+      if (rows.hasNext()) {
+        // Note: only send type info if there's data with it
+        typeSize = sendType(buf);
+        buf.writeByte(DATA_CHUNK);
+        int rowSize = structType == null ? 2 : structType.getFieldNames().length;
+        while (rows.hasNext()) {
+          rowCount ++;
+          Object[] row = rows.next();          
+          if (rowCount < 2) dataType = entryDataType(row);
+          if (rowSize != row.length) 
+            throw new IOException(rowToString("Expect "  + rowSize + " columns, but got ", row));
+          serializeRowToBuffer(row, buf);
+          if (buf.size() > CHUNK_SIZE) {
+            dataSize += sendBufferredData(buf, false);
+            buf.writeByte(DATA_CHUNK);
+          }
+        }
+      }
+      // send last piece of data or empty byte array
+      dataSize += sendBufferredData(buf, true);
+      logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" +
+                  typeSize + ", data.size=" + dataSize + ", row.avg.size=" +
+                  (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount)));
+    } catch (IOException | RuntimeException e) {
+      sendException(buf, e);
+    } finally {
+      closed = true;
+    }
+  }
+
+  private String rowToString(String rowDesc, Object[] row) {
+    StringBuilder buf = new StringBuilder();
+    buf.append(rowDesc).append("(");
+    for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]);
+    return buf.append(")") .toString();    
+  }
+
+  private String entryDataType(Object[] row) {
+    StringBuilder buf = new StringBuilder();
+    buf.append("(");
+    for (int i = 0; i < row.length; i++) {
+      if (i != 0) buf.append(", ");
+      buf.append(row[i].getClass().getCanonicalName());
+    }
+    return buf.append(")").toString();
+  }
+  
+  private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException {
+    for (Object data : row) {
+      if (data instanceof CachedDeserializable) {
+        buf.writeByte(SER_DATA);
+        DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf);
+      } else if (data instanceof byte[]) {
+        buf.writeByte(BYTEARR_DATA);
+        DataSerializer.writeByteArray((byte[]) data, buf);
+      } else {
+        buf.writeByte(UNSER_DATA);
+        DataSerializer.writeObject(data, buf);
+      }
+    }
+  }
+  
+  /** return the size of type data */
+  private int sendType(HeapDataOutputStream buf) throws IOException {
+    // logger.info(desc + " struct type: " + structType);
+    if (structType != null) {
+      buf.writeByte(TYPE_CHUNK);
+      DataSerializer.writeObject(structType, buf);
+      return sendBufferredData(buf, false);      
+    } else {
+      return 0;  // default KeyValue type, no type info send
+    }
+  }
+  
+  private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException {
+    if (isLast) sender.lastResult(buf.toByteArray());
+    else sender.sendResult(buf.toByteArray());
+    // logData(buf.toByteArray(), desc);
+    int s = buf.size();
+    buf.reset();
+    return s;    
+  }
+
+  /** Send the exception as the last chunk of the result. */
+  private void sendException(HeapDataOutputStream buf, Exception e) {
+    // Note: if exception happens during the serialization, the `buf` may contain
+    // partial serialized data, which may cause de-serialization hang or error.
+    // Therefore, always empty the buffer before sending the exception
+    if (buf.size() > 0) buf.reset();
+    
+    try {
+      buf.writeByte(ERROR_CHUNK);
+      DataSerializer.writeObject(e, buf);
+    } catch (IOException ioe) {
+      logger.error("StructStreamingResultSender failed to send the result:", e);
+      logger.error("StructStreamingResultSender failed to serialize the exception:", ioe);
+      buf.reset();
+    }
+    // Note: send empty chunk as the last result if serialization of exception 
+    // failed, and the error is logged on the GemFire server side.
+    sender.lastResult(buf.toByteArray());
+    // logData(buf.toByteArray(), desc);
+  }
+
+//  private void logData(byte[] data, String desc) {
+//    StringBuilder buf = new StringBuilder();
+//    buf.append(desc);
+//    for (byte b : data) {
+//      buf.append(" ").append(b);
+//    }
+//    logger.info(buf.toString());
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
new file mode 100644
index 0000000..9fba9e1
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import java.io.Serializable;
+
+public class Employee implements Serializable {
+
+  private String name;
+
+  private int age;
+
+  public Employee(String n, int a) {
+    name = n;
+    age = a;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public int getAge() {
+    return age;
+  }
+
+  public String toString() {
+    return new StringBuilder().append("Employee[name=").append(name).
+            append(", age=").append(age).
+            append("]").toString();
+  }
+  
+  public boolean equals(Object o) {
+    if (o instanceof Employee) {
+      return ((Employee) o).name.equals(name) && ((Employee) o).age == age;
+    }
+    return false;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
new file mode 100644
index 0000000..8f5a045
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import com.gemstone.gemfire.cache.Region;
+import io.pivotal.geode.spark.connector.GeodeConnection;
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.GeodeConnectionConf$;
+import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager$;
+import io.pivotal.geode.spark.connector.javaapi.GeodeJavaRegionRDD;
+import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster$;
+import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.scalatest.junit.JUnitSuite;
+import io.pivotal.geode.spark.connector.package$;
+import scala.Tuple2;
+import scala.Option;
+import scala.Some;
+
+import java.util.*;
+
+import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.RDDSaveBatchSizePropKey;
+import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.javaFunctions;
+import static org.junit.Assert.*;
+
+public class JavaApiIntegrationTest extends JUnitSuite {
+
+  static JavaSparkContext jsc = null;
+  static GeodeConnectionConf connConf = null;
+  
+  static int numServers = 2;
+  static int numObjects = 1000;
+  static String regionPath = "pr_str_int_region";
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // start geode cluster, and spark context
+    Properties settings = new Properties();
+    settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml");
+    settings.setProperty("num-of-servers", Integer.toString(numServers));
+    int locatorPort = GeodeCluster$.MODULE$.start(settings);
+
+    // start spark context in local mode
+    Properties props = new Properties();
+    props.put("log4j.logger.org.apache.spark", "INFO");
+    props.put("log4j.logger.io.pivotal.geode.spark.connector","DEBUG");
+    IOUtils.configTestLog4j("ERROR", props);
+    SparkConf conf = new SparkConf()
+            .setAppName("RetrieveRegionIntegrationTest")
+            .setMaster("local[2]")
+            .set(package$.MODULE$.GeodeLocatorPropKey(), "localhost:"+ locatorPort);
+    // sc = new SparkContext(conf);
+    jsc = new JavaSparkContext(conf);
+    connConf = GeodeConnectionConf.apply(jsc.getConf());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    // stop connection, spark context, and geode cluster
+    DefaultGeodeConnectionManager$.MODULE$.closeConnection(GeodeConnectionConf$.MODULE$.apply(jsc.getConf()));
+    jsc.stop();
+    GeodeCluster$.MODULE$.stop();
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //   utility methods
+  // --------------------------------------------------------------------------------------------
+
+  private <K,V> void matchMapAndPairList(Map<K,V> map, List<Tuple2<K,V>> list) {
+    assertTrue("size mismatch \nmap: " + map.toString() + "\nlist: " + list.toString(), map.size() == list.size());
+    for (Tuple2<K, V> p : list) {
+      assertTrue("value mismatch: k=" + p._1() + " v1=" + p._2() + " v2=" + map.get(p._1()),
+                 p._2().equals(map.get(p._1())));
+    }
+  }
+
+  private Region<String, Integer> prepareStrIntRegion(String regionPath, int start, int stop) {
+    HashMap<String, Integer> entriesMap = new HashMap<>();
+    for (int i = start; i < stop; i ++) {
+      entriesMap.put("k_" + i, i);
+    }
+
+    GeodeConnection conn = connConf.getConnection();
+    Region<String, Integer> region = conn.getRegionProxy(regionPath);
+    region.removeAll(region.keySetOnServer());
+    region.putAll(entriesMap);
+    return region;
+  }
+
+  private JavaPairRDD<String, Integer> prepareStrIntJavaPairRDD(int start, int stop) {
+    List<Tuple2<String, Integer>> data = new ArrayList<>();
+    for (int i = start; i < stop; i ++) {
+      data.add(new Tuple2<>("k_" + i, i));
+    }
+    return jsc.parallelizePairs(data);
+  }
+
+  private JavaPairRDD<Integer, Integer> prepareIntIntJavaPairRDD(int start, int stop) {
+    List<Tuple2<Integer, Integer>> data = new ArrayList<>();
+    for (int i = start; i < stop; i ++) {
+      data.add(new Tuple2<>(i, i * 2));
+    }
+    return jsc.parallelizePairs(data);
+  }
+
+  private JavaRDD<Integer> prepareIntJavaRDD(int start, int stop) {
+    List<Integer> data = new ArrayList<>();
+    for (int i = start; i < stop; i ++) {
+      data.add(i);
+    }
+    return jsc.parallelize(data);
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //   JavaRDD.saveToGeode
+  // --------------------------------------------------------------------------------------------
+
+  static class IntToStrIntPairFunction implements PairFunction<Integer, String, Integer> {
+    @Override public Tuple2<String, Integer> call(Integer x) throws Exception {
+      return new Tuple2<>("k_" + x, x);
+    }
+  }
+
+  @Test
+  public void testRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws Exception {
+    verifyRDDSaveToGeode(true, true);
+  }
+
+  @Test
+  public void testRDDSaveToGeodeWithDefaultConnConf() throws Exception {
+    verifyRDDSaveToGeode(true, false);
+  }
+  
+  @Test
+  public void testRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
+    verifyRDDSaveToGeode(false, true);
+  }
+
+  @Test
+  public void testRDDSaveToGeodeWithConnConf() throws Exception {
+    verifyRDDSaveToGeode(false, false);
+  }
+  
+  public void verifyRDDSaveToGeode(boolean useDefaultConnConf, boolean useOpConf) throws Exception {
+    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  // remove all entries
+    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects);
+
+    PairFunction<Integer, String, Integer> func = new IntToStrIntPairFunction();
+    Properties opConf = new Properties();
+    opConf.put(RDDSaveBatchSizePropKey, "200");
+
+    if (useDefaultConnConf) {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGeode(regionPath, func, opConf);
+      else
+        javaFunctions(rdd1).saveToGeode(regionPath, func);
+    } else {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGeode(regionPath, func, connConf, opConf);
+      else
+        javaFunctions(rdd1).saveToGeode(regionPath, func, connConf);
+    }
+    
+    Set<String> keys = region.keySetOnServer();
+    Map<String, Integer> map = region.getAll(keys);
+
+    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
+
+    for (int i = 0; i < numObjects; i ++) {
+      expectedList.add(new Tuple2<>("k_" + i, i));
+    }
+    matchMapAndPairList(map, expectedList);
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //   JavaPairRDD.saveToGeode
+  // --------------------------------------------------------------------------------------------
+
+  @Test
+  public void testPairRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws Exception {
+    verifyPairRDDSaveToGeode(true, true);
+  }
+
+  @Test
+  public void testPairRDDSaveToGeodeWithDefaultConnConf() throws Exception {
+    verifyPairRDDSaveToGeode(true, false);
+  }
+  
+  @Test
+  public void testPairRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
+    verifyPairRDDSaveToGeode(false, true);
+  }
+
+  @Test
+  public void testPairRDDSaveToGeodeWithConnConf() throws Exception {
+    verifyPairRDDSaveToGeode(false, false);
+  }
+  
+  public void verifyPairRDDSaveToGeode(boolean useDefaultConnConf, boolean useOpConf) throws Exception {
+    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  // remove all entries
+    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, numObjects);
+    Properties opConf = new Properties();
+    opConf.put(RDDSaveBatchSizePropKey, "200");
+
+    if (useDefaultConnConf) {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGeode(regionPath, opConf);
+      else
+        javaFunctions(rdd1).saveToGeode(regionPath);
+    } else {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGeode(regionPath, connConf, opConf);
+      else
+        javaFunctions(rdd1).saveToGeode(regionPath, connConf);
+    }
+
+    Set<String> keys = region.keySetOnServer();
+    Map<String, Integer> map = region.getAll(keys);
+
+    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
+    for (int i = 0; i < numObjects; i ++) {
+      expectedList.add(new Tuple2<>("k_" + i, i));
+    }
+    matchMapAndPairList(map, expectedList);
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //   JavaSparkContext.geodeRegion and where clause
+  // --------------------------------------------------------------------------------------------
+
+  @Test
+  public void testJavaSparkContextGeodeRegion() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);  // remove all entries
+    Properties emptyProps = new Properties();
+    GeodeJavaRegionRDD<String, Integer> rdd1 = javaFunctions(jsc).geodeRegion(regionPath);
+    GeodeJavaRegionRDD<String, Integer> rdd2 = javaFunctions(jsc).geodeRegion(regionPath, emptyProps);
+    GeodeJavaRegionRDD<String, Integer> rdd3 = javaFunctions(jsc).geodeRegion(regionPath, connConf);
+    GeodeJavaRegionRDD<String, Integer> rdd4 = javaFunctions(jsc).geodeRegion(regionPath, connConf, emptyProps);
+    GeodeJavaRegionRDD<String, Integer> rdd5 = rdd1.where("value.intValue() < 50");
+
+    HashMap<String, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < numObjects; i ++) {
+      expectedMap.put("k_" + i, i);
+    }
+
+    matchMapAndPairList(expectedMap, rdd1.collect());
+    matchMapAndPairList(expectedMap, rdd2.collect());
+    matchMapAndPairList(expectedMap, rdd3.collect());
+    matchMapAndPairList(expectedMap, rdd4.collect());
+
+    HashMap<String, Integer> expectedMap2 = new HashMap<>();
+    for (int i = 0; i < 50; i ++) {
+      expectedMap2.put("k_" + i, i);
+    }
+
+    matchMapAndPairList(expectedMap2, rdd5.collect());
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //   JavaPairRDD.joinGeodeRegion
+  // --------------------------------------------------------------------------------------------
+
+  @Test
+  public void testPairRDDJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
+
+    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath);
+    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, connConf);
+    // System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+    HashMap<Tuple2<String, Integer>, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < 10; i ++) {
+      expectedMap.put(new Tuple2<>("k_" + i, i), i);
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  static class IntIntPairToStrKeyFunction implements Function<Tuple2<Integer, Integer>, String> {
+    @Override public String call(Tuple2<Integer, Integer> pair) throws Exception {
+      return "k_" + pair._1();
+    }
+  }
+
+  @Test
+  public void testPairRDDJoinWithDiffKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
+    Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction();
+
+    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
+    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+    HashMap<Tuple2<Integer, Integer>, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < 10; i ++) {
+      expectedMap.put(new Tuple2<>(i, i * 2), i);
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //   JavaPairRDD.outerJoinGeodeRegion
+  // --------------------------------------------------------------------------------------------
+
+  @Test
+  public void testPairRDDOuterJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
+
+    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath);
+    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+    HashMap<Tuple2<String, Integer>, Option<Integer>> expectedMap = new HashMap<>();
+    for (int i = -5; i < 10; i ++) {
+      if (i < 0)
+        expectedMap.put(new Tuple2<>("k_" + i, i), Option.apply((Integer) null));
+      else
+        expectedMap.put(new Tuple2<>("k_" + i, i), Some.apply(i));
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  @Test
+  public void testPairRDDOuterJoinWithDiffKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
+    Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction();
+
+    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
+    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+    HashMap<Tuple2<Integer, Integer>, Option<Integer>> expectedMap = new HashMap<>();
+    for (int i = -5; i < 10; i ++) {
+      if (i < 0)
+        expectedMap.put(new Tuple2<>(i, i * 2), Option.apply((Integer) null));
+      else
+        expectedMap.put(new Tuple2<>(i, i * 2), Some.apply(i));
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //   JavaRDD.joinGeodeRegion
+  // --------------------------------------------------------------------------------------------
+
+  static class IntToStrKeyFunction implements Function<Integer, String> {
+    @Override public String call(Integer x) throws Exception {
+      return "k_" + x;
+    }
+  }
+
+  @Test
+  public void testRDDJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
+
+    Function<Integer, String> func = new IntToStrKeyFunction();
+    JavaPairRDD<Integer, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
+    JavaPairRDD<Integer, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+    HashMap<Integer, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < 10; i ++) {
+      expectedMap.put(i, i);
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //   JavaRDD.outerJoinGeodeRegion
+  // --------------------------------------------------------------------------------------------
+
+  @Test
+  public void testRDDOuterJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
+
+    Function<Integer, String> func = new IntToStrKeyFunction();
+    JavaPairRDD<Integer, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
+    JavaPairRDD<Integer, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
+
+    HashMap<Integer, Option<Integer>> expectedMap = new HashMap<>();
+    for (int i = -5; i < 10; i ++) {
+      if (i < 0)
+        expectedMap.put(i, Option.apply((Integer) null));
+      else
+        expectedMap.put(i, Some.apply(i));
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
new file mode 100644
index 0000000..1457db9
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.geode.spark.connector;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import com.gemstone.gemfire.cache.Declarable;
+
+/**
+ * A stock portfolio that consists of multiple {@link Position} objects that
+ * represent shares of stock (a "security").  Instances of
+ * <code>Portfolio</code> can be stored in a Geode <code>Region</code> and
+ * their contents can be queried using the Geode query service.
+ * </p>
+ * This class is <code>Serializable</code> because we want it to be distributed
+ * to multiple members of a distributed system.  Because this class is
+ * <code>Declarable</code>, we can describe instances of it in a Geode
+ * <code>cache.xml</code> file.
+ * </p>
+ *
+ */
+public class Portfolio implements Declarable, Serializable {
+
+  private static final long serialVersionUID = 9097335119586059309L;
+
+  private int id;  /* id is used as the entry key and is stored in the entry */
+  private String type;
+  private Map<String,Position> positions = new LinkedHashMap<String,Position>();
+  private String status;
+
+  public Portfolio(Properties props) {
+    init(props);
+  }
+
+  @Override
+  public void init(Properties props) {
+    this.id = Integer.parseInt(props.getProperty("id"));
+    this.type = props.getProperty("type", "type1");
+    this.status = props.getProperty("status", "active");
+
+    // get the positions. These are stored in the properties object
+    // as Positions, not String, so use Hashtable protocol to get at them.
+    // the keys are named "positionN", where N is an integer.
+    for (Map.Entry<Object, Object> entry: props.entrySet()) {
+      String key = (String)entry.getKey();
+      if (key.startsWith("position")) {
+        Position pos = (Position)entry.getValue();
+        this.positions.put(pos.getSecId(), pos);
+      }
+    }
+  }
+
+  public void setType(String t) {this.type = t; }
+
+  public String getStatus(){
+    return status;
+  }
+
+  public int getId(){
+    return this.id;
+  }
+
+  public Map<String,Position> getPositions(){
+    return this.positions;
+  }
+
+  public String getType() {
+    return this.type;
+  }
+
+  public boolean isActive(){
+    return status.equals("active");
+  }
+
+  @Override
+  public String toString(){
+    StringBuilder buf = new StringBuilder();
+    buf.append("\n\tPortfolio [id=" + this.id + " status=" + this.status);
+    buf.append(" type=" + this.type);
+    boolean firstTime = true;
+    for (Map.Entry<String, Position> entry: positions.entrySet()) {
+      if (!firstTime) {
+        buf.append(", ");
+      }
+      buf.append("\n\t\t");
+      buf.append(entry.getKey() + ":" + entry.getValue());
+      firstTime = false;
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+}
+


Mime
View raw message