geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasonhu...@apache.org
Subject [03/13] incubator-geode git commit: GEODE-9: Imported gemfire-spark-connector from geode-1.0.0-SNAPSHOT-2.src.tar
Date Mon, 27 Jul 2015 21:43:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala b/gemfire-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
new file mode 100644
index 0000000..efe229c
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
@@ -0,0 +1,28 @@
+package org.apache.spark.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.reflect.ClassTag
+
+class TestInputDStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
+  extends InputDStream[T](ssc_) {
+
+  def start() {}
+
+  def stop() {}
+
+  def compute(validTime: Time): Option[RDD[T]] = {
+    logInfo("Computing RDD for time " + validTime)
+    val index = ((validTime - zeroTime) / slideDuration - 1).toInt
+    val selectedInput = if (index < input.size) input(index) else Seq[T]()
+
+    // lets us test cases where RDDs are not created
+    if (selectedInput == null)
+      return None
+
+    val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
+    logInfo("Created RDD " + rdd.id + " with " + selectedInput)
+    Some(rdd)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
new file mode 100644
index 0000000..951d6c9
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java
@@ -0,0 +1,44 @@
+package io.pivotal.gemfire.spark.connector.javaapi;
+
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
+import io.pivotal.gemfire.spark.connector.streaming.GemFireDStreamFunctions;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream}
+ * to provide GemFire Spark Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
+ */ 
+public class GemFireJavaDStreamFunctions<T> {
+  
+  public final GemFireDStreamFunctions<T> dsf;
+
+  public GemFireJavaDStreamFunctions(JavaDStream<T> ds) {
+    this.dsf = new GemFireDStreamFunctions<T>(ds.dstream());
+  }
+
+  /**
+   * Save the JavaDStream to GemFire key-value store.
+   * @param regionPath the full path of region that the DStream is stored  
+   * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   */
+  public <K, V> void saveToGemfire(
+    String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) {
+    dsf.saveToGemfire(regionPath, func, connConf);
+  }
+
+  /**
+   * Save the JavaDStream to GemFire key-value store.
+   * @param regionPath the full path of region that the DStream is stored
+   * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
+   */
+  public <K, V> void saveToGemfire(
+          String regionPath, PairFunction<T, K, V> func) {
+    dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
new file mode 100644
index 0000000..3b43a65
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java
@@ -0,0 +1,39 @@
+package io.pivotal.gemfire.spark.connector.javaapi;
+
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
+import io.pivotal.gemfire.spark.connector.streaming.GemFirePairDStreamFunctions;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream}
+ * to provide GemFire Spark Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
+ */
+public class GemFireJavaPairDStreamFunctions<K, V> {
+  
+  public final GemFirePairDStreamFunctions<K, V> dsf;
+
+  public GemFireJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) {    
+    this.dsf = new GemFirePairDStreamFunctions<K, V>(ds.dstream());
+  }
+
+  /**
+   * Save the JavaPairDStream to GemFire key-value store.
+   * @param regionPath the full path of region that the DStream is stored  
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   */  
+  public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) {
+    dsf.saveToGemfire(regionPath, connConf);
+  }
+
+  /**
+   * Save the JavaPairDStream to GemFire key-value store.
+   * @param regionPath the full path of region that the DStream is stored
+   */
+  public void saveToGemfire(String regionPath) {
+    dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
new file mode 100644
index 0000000..609cdbf
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java
@@ -0,0 +1,203 @@
+package io.pivotal.gemfire.spark.connector.javaapi;
+
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
+import io.pivotal.gemfire.spark.connector.GemFirePairRDDFunctions;
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD;
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import scala.Option;
+import scala.Tuple2;
+import scala.reflect.ClassTag;
+
+import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide GemFire Spark
+ * Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
+ */
+public class GemFireJavaPairRDDFunctions<K, V> {
+
+  public final GemFirePairRDDFunctions<K, V> rddf;
+
+  public GemFireJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) {
+    this.rddf = new GemFirePairRDDFunctions<K, V>(rdd.rdd());
+  }
+
+  /**
+   * Save the pair RDD to GemFire key-value store.
+   * @param regionPath the full path of region that the RDD is stored
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   */
+  public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) {
+    rddf.saveToGemfire(regionPath, connConf);
+  }
+
+  /**
+   * Save the pair RDD to GemFire key-value store with the default GemFireConnector.
+   * @param regionPath the full path of region that the RDD is stored
+   */
+  public void saveToGemfire(String regionPath) {
+    rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Return an JavaPairRDD containing all pairs of elements with matching keys in
+   * this RDD&lt;K, V> and the GemFire `Region&lt;K, V2>`. Each pair of elements
+   * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
+   * (k, v2) is in the GemFire region.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return JavaPairRDD&lt;&lt;K, V>, V2>
+   */  
+  public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(String regionPath) {
+    return joinGemfireRegion(regionPath, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Return an JavaPairRDD containing all pairs of elements with matching keys in
+   * this RDD&lt;K, V> and the GemFire `Region&lt;K, V2>`. Each pair of elements
+   * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
+   * (k, v2) is in the GemFire region.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam V2 the value type of the GemFire region
+   * @return JavaPairRDD&lt;&lt;K, V>, V2>
+   */
+  public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(
+    String regionPath, GemFireConnectionConf connConf) {
+    GemFireJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGemfireRegion(regionPath, connConf);
+    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+    ClassTag<V2> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in this
+   * RDD&lt;K, V> and the GemFire `Region&lt;K2, V2>`. The join key from RDD
+   * element is generated by `func(K, V) => K2`, and the key from the GemFire
+   * region is just the key of the key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
+   * where (k, v) is in this RDD and (k2, v2) is in the GemFire region.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, V2>
+   */
+  public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(
+    String regionPath, Function<Tuple2<K, V>, K2> func) {
+    return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in this
+   * RDD&lt;K, V> and the GemFire `Region&lt;K2, V2>`. The join key from RDD 
+   * element is generated by `func(K, V) => K2`, and the key from the GemFire 
+   * region is just the key of the key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
+   * where (k, v) is in this RDD and (k2, v2) is in the GemFire region.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, V2>
+   */  
+  public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(
+    String regionPath, Function<Tuple2<K, V>, K2> func, GemFireConnectionConf connConf) {
+    GemFireJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGemfireRegion(regionPath, func, connConf);
+    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+    ClassTag<V2> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;K, V> and the GemFire `Region&lt;K, V2>`.
+   * For each element (k, v) in this RDD, the resulting RDD will either contain
+   * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
+   * ((k, v), None)) if no element in the GemFire region have key k.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
+   */
+  public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(String regionPath) {
+    return outerJoinGemfireRegion(regionPath, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;K, V> and the GemFire `Region&lt;K, V2>`.
+   * For each element (k, v) in this RDD, the resulting RDD will either contain
+   * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
+   * ((k, v), None)) if no element in the GemFire region have key k.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
+   */  
+  public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(
+    String regionPath, GemFireConnectionConf connConf) {
+    GemFireOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGemfireRegion(regionPath, connConf);
+    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+    ClassTag<Option<V2>> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;K, V> and the GemFire `Region&lt;K2, V2>`.
+   * The join key from RDD element is generated by `func(K, V) => K2`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (k, v) in `this` RDD, the resulting RDD will either contain
+   * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
+   * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
+   */
+  public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(
+    String regionPath, Function<Tuple2<K, V>, K2> func) {
+    return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;K, V> and the GemFire `Region&lt;K2, V2>`.
+   * The join key from RDD element is generated by `func(K, V) => K2`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (k, v) in `this` RDD, the resulting RDD will either contain
+   * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
+   * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
+   */
+  public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(
+    String regionPath, Function<Tuple2<K, V>, K2> func, GemFireConnectionConf connConf) {
+    GemFireOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf);
+    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+    ClassTag<Option<V2>> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
new file mode 100644
index 0000000..ccdabb7
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java
@@ -0,0 +1,136 @@
+package io.pivotal.gemfire.spark.connector.javaapi;
+
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
+import io.pivotal.gemfire.spark.connector.GemFireRDDFunctions;
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD;
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Option;
+import scala.reflect.ClassTag;
+
+import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide GemFire Spark
+ * Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
+ */
+public class GemFireJavaRDDFunctions<T> {
+
+  public final GemFireRDDFunctions<T> rddf;
+
+  public GemFireJavaRDDFunctions(JavaRDD<T> rdd) {
+    this.rddf = new GemFireRDDFunctions<T>(rdd.rdd());
+  }
+
+  /**
+   * Save the non-pair RDD to GemFire key-value store.
+   * @param regionPath the full path of region that the RDD is stored  
+   * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   */  
+  public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) {
+    rddf.saveToGemfire(regionPath, func, connConf);
+  }
+
+  /**
+   * Save the non-pair RDD to GemFire key-value store with default GemFireConnector.
+   * @param regionPath the full path of region that the RDD is stored  
+   * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
+   */
+  public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func) {
+    rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in this
+   * RDD&lt;T> and the GemFire `Region&lt;K, V>`. The join key from RDD
+   * element is generated by `func(T) => K`, and the key from the GemFire
+   * region is just the key of the key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
+   * where t is from this RDD and v is from the GemFire region.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element T
+   * @tparam K the key type of the GemFire region
+   * @tparam V the value type of the GemFire region
+   * @return JavaPairRDD&lt;T, V>
+   */
+  public <K, V> JavaPairRDD<T, V> joinGemfireRegion(String regionPath, Function<T, K> func) {
+    return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in this
+   * RDD&lt;T> and the GemFire `Region&lt;K, V>`. The join key from RDD
+   * element is generated by `func(T) => K`, and the key from the GemFire
+   * region is just the key of the key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
+   * where t is from this RDD and v is from the GemFire region.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element T
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K the key type of the GemFire region
+   * @tparam V the value type of the GemFire region
+   * @return JavaPairRDD&lt;T, V>
+   */
+  public <K, V> JavaPairRDD<T, V> joinGemfireRegion(
+    String regionPath, Function<T, K> func, GemFireConnectionConf connConf) {
+    GemFireJoinRDD<T, K, V> rdd = rddf.joinGemfireRegion(regionPath, func, connConf);
+    ClassTag<T> kt = fakeClassTag();
+    ClassTag<V> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;T> and the GemFire `Region&lt;K, V>`.
+   * The join key from RDD element is generated by `func(T) => K`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (t) in this RDD, the resulting RDD will either contain
+   * all pairs (t, Some(v)) for v in the GemFire region, or the pair
+   * (t, None) if no element in the GemFire region have key `func(t)`.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element T
+   * @tparam K the key type of the GemFire region
+   * @tparam V the value type of the GemFire region
+   * @return JavaPairRDD&lt;T, Option&lt;V>>
+   */
+  public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(String regionPath, Function<T, K> func) {
+    return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;T> and the GemFire `Region&lt;K, V>`.
+   * The join key from RDD element is generated by `func(T) => K`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (t) in this RDD, the resulting RDD will either contain
+   * all pairs (t, Some(v)) for v in the GemFire region, or the pair
+   * (t, None) if no element in the GemFire region have key `func(t)`.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element T
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K the key type of the GemFire region
+   * @tparam V the value type of the GemFire region
+   * @return JavaPairRDD&lt;T, Option&lt;V>>
+   */
+  public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(
+    String regionPath, Function<T, K> func, GemFireConnectionConf connConf) {
+    GemFireOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf);
+    ClassTag<T> kt = fakeClassTag();
+    ClassTag<Option<V>> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java
new file mode 100644
index 0000000..eba1ad9
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java
@@ -0,0 +1,33 @@
+package io.pivotal.gemfire.spark.connector.javaapi;
+
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
+import io.pivotal.gemfire.spark.connector.GemFireSQLContextFunctions;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+/**
+ * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide GemFire
+ * OQL functionality.
+ *
+ * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
+ */
+public class GemFireJavaSQLContextFunctions {
+
+  public final GemFireSQLContextFunctions scf;
+
+  public GemFireJavaSQLContextFunctions(SQLContext sqlContext) {
+    scf = new GemFireSQLContextFunctions(sqlContext);
+  }
+
+  public <T> DataFrame gemfireOQL(String query) {
+    DataFrame df = scf.gemfireOQL(query, scf.defaultConnectionConf());
+    return df;
+  }
+
+  public <T> DataFrame gemfireOQL(String query, GemFireConnectionConf connConf) {
+    DataFrame df = scf.gemfireOQL(query, connConf);
+    return df;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java
new file mode 100644
index 0000000..f822a1f
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java
@@ -0,0 +1,71 @@
+package io.pivotal.gemfire.spark.connector.javaapi;
+
+
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD;
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD$;
+import org.apache.spark.SparkContext;
+import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
+
+import scala.reflect.ClassTag;
+import java.util.Properties;
+
+/**
+ * Java API wrapper over {@link org.apache.spark.SparkContext} to provide GemFire
+ * Connector functionality.
+ *
+ * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
+ * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
+ */
+public class GemFireJavaSparkContextFunctions {
+
+  public final SparkContext sc;
+
+  public GemFireJavaSparkContextFunctions(SparkContext sc) {
+    this.sc = sc;
+  }
+
+  /**
+   * Expose a GemFire region as a JavaPairRDD
+   * @param regionPath the full path of the region
+   * @param connConf the GemFireConnectionConf that can be used to access the region
+   * @param opConf the parameters for this operation, such as preferred partitioner.
+   */
+  public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(
+    String regionPath, GemFireConnectionConf connConf, Properties opConf) {
+    ClassTag<K> kt = fakeClassTag();
+    ClassTag<V> vt = fakeClassTag();    
+    GemFireRegionRDD<K, V>  rdd =  GemFireRegionRDD$.MODULE$.apply(
+      sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt);
+    return new GemFireJavaRegionRDD<>(rdd);
+  }
+
+  /**
+   * Expose a GemFire region as a JavaPairRDD with default GemFireConnector and no preferred partitioner.
+   * @param regionPath the full path of the region
+   */
+  public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath) {
+    GemFireConnectionConf connConf = GemFireConnectionConf.apply(sc.getConf());
+    return gemfireRegion(regionPath, connConf, new Properties());
+  }
+
+  /**
+   * Expose a GemFire region as a JavaPairRDD with no preferred partitioner.
+   * @param regionPath the full path of the region
+   * @param connConf the GemFireConnectionConf that can be used to access the region
+   */
+  public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath, GemFireConnectionConf connConf) {
+    return gemfireRegion(regionPath, connConf, new Properties());
+  }
+
+  /**
+   * Expose a GemFire region as a JavaPairRDD with default GemFireConnector.
+   * @param regionPath the full path of the region
+   * @param opConf the parameters for this operation, such as preferred partitioner.
+   */
+  public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath, Properties opConf) {
+    GemFireConnectionConf connConf = GemFireConnectionConf.apply(sc.getConf());
+    return gemfireRegion(regionPath, connConf, opConf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
new file mode 100644
index 0000000..ff11588
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java
@@ -0,0 +1,105 @@
+package io.pivotal.gemfire.spark.connector.javaapi;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import scala.Tuple2;
+
+import io.pivotal.gemfire.spark.connector.package$;
+
+/**
+ * The main entry point to Spark GemFire Connector Java API.
+ *
+ * There are several helpful static factory methods which build useful wrappers
+ * around Spark Context, Streaming Context and RDD. There are also helper methods
+ * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>.
+ */
+public final class GemFireJavaUtil {
+  
+  /** constants */
+  public static String GemFireLocatorPropKey = package$.MODULE$.GemFireLocatorPropKey();
+  // partitioner related keys and values
+  public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey();
+  public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey();
+  public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName();
+  public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName();
+
+  
+  /** The private constructor is used prevents user from creating instance of this class. */
+  private GemFireJavaUtil() { }
+
+  /**
+   * A static factory method to create a {@link GemFireJavaSparkContextFunctions} based
+   * on an existing {@link SparkContext} instance.
+   */
+  public static GemFireJavaSparkContextFunctions javaFunctions(SparkContext sc) {
+    return new GemFireJavaSparkContextFunctions(sc);
+  }
+
+  /**
+   * A static factory method to create a {@link GemFireJavaSparkContextFunctions} based
+   * on an existing {@link JavaSparkContext} instance.
+   */
+  public static GemFireJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) {
+    return new GemFireJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc));
+  }
+
+  /**
+   * A static factory method to create a {@link GemFireJavaPairRDDFunctions} based on an
+   * existing {@link org.apache.spark.api.java.JavaPairRDD} instance.
+   */
+  public static <K, V> GemFireJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) {
+    return new GemFireJavaPairRDDFunctions<K, V>(rdd);
+  }
+
+  /**
+   * A static factory method to create a {@link GemFireJavaRDDFunctions} based on an
+   * existing {@link org.apache.spark.api.java.JavaRDD} instance.
+   */
+  public static <T> GemFireJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) {
+    return new GemFireJavaRDDFunctions<T>(rdd);
+  }
+
+  /**
+   * A static factory method to create a {@link GemFireJavaPairDStreamFunctions} based on an
+   * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance.
+   */
+  public static <K, V> GemFireJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) {
+    return new GemFireJavaPairDStreamFunctions<>(ds);
+  }
+
+  /**
+   * A static factory method to create a {@link GemFireJavaDStreamFunctions} based on an
+   * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance.
+   */
+  public static <T> GemFireJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) {
+    return new GemFireJavaDStreamFunctions<>(ds);
+  }
+
+  /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}&lt;&lt;Tuple2&lt;K, V&gt;&gt;
+   * to a {@link org.apache.spark.api.java.JavaPairRDD}&lt;K, V&gt;.
+   */
+  public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) {
+    return JavaAPIHelper.toJavaPairRDD(rdd);
+  }
+
+  /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}&lt;&lt;Tuple2&lt;K, V&gt;&gt;
+   * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}&lt;K, V&gt;.
+   */
+  public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) {
+    return JavaAPIHelper.toJavaPairDStream(ds);
+  }
+
+  /**
+   * A static factory method to create a {@link GemFireJavaSQLContextFunctions} based
+   * on an existing {@link SQLContext} instance.
+   */
+  public static GemFireJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) {
+    return new GemFireJavaSQLContextFunctions(sqlContext);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala
new file mode 100644
index 0000000..9c58b78
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala
@@ -0,0 +1,51 @@
+package io.pivotal.gemfire.spark.connector
+
+import com.gemstone.gemfire.cache.execute.ResultCollector
+import com.gemstone.gemfire.cache.query.Query
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition
+
+
+trait GemFireConnection {
+
+  /**
+   * Validate region existence and key/value type constraints, throw RuntimeException
+   * if region does not exist or key and/or value type do(es) not match.
+   * @param regionPath the full path of region
+   */
+  def validateRegion[K, V](regionPath: String): Unit
+
+  /**
+   * Get Region proxy for the given region
+   * @param regionPath the full path of region
+   */
+  def getRegionProxy[K, V](regionPath: String): Region[K, V]
+
+  /**
+   * Retrieve region meta data for the given region. 
+   * @param regionPath: the full path of the region
+   * @return Some[RegionMetadata] if region exists, None otherwise
+   */
+  def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata]
+
+  /** 
+   * Retrieve region data for the given region and bucket set 
+   * @param regionPath: the full path of the region
+   * @param whereClause: the set of bucket IDs
+   * @param split: GemFire RDD Partition instance
+   */
+  def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GemFireRDDPartition): Iterator[(K, V)]
+
+  def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object
+  /** 
+   * Create a gemfire OQL query
+   * @param queryString GemFire OQL query string
+   */
+  def getQuery(queryString: String): Query
+
+  /** Close the connection */
+  def close(): Unit
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala
new file mode 100644
index 0000000..2b8a0a8
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala
@@ -0,0 +1,57 @@
+package io.pivotal.gemfire.spark.connector
+
+import org.apache.spark.SparkConf
+import io.pivotal.gemfire.spark.connector.internal.{DefaultGemFireConnectionManager, LocatorHelper}
+
+/**
+ * Stores configuration of a connection to GemFire cluster. It is serializable and can
+ * be safely sent over network.
+ *
+ * @param locators GemFire locator host:port pairs, the default is (localhost,10334)
+ * @param gemfireProps The initial gemfire properties to be used.
+ * @param connectionManager GemFireConnectionFactory instance
+ */
+class GemFireConnectionConf(
+   val locators: Seq[(String, Int)], 
+   val gemfireProps: Map[String, String] = Map.empty,
+   connectionManager: GemFireConnectionManager = new DefaultGemFireConnectionManager
+  ) extends Serializable {
+
+  /** require at least 1 pair of (host,port) */
+  require(locators.nonEmpty)
+  
+  def getConnection: GemFireConnection = connectionManager.getConnection(this)
+  
+}
+
+object GemFireConnectionConf {
+
+  /**
+   * create GemFireConnectionConf object based on locator string and optional GemFireConnectionFactory
+   * @param locatorStr GemFire cluster locator string
+   * @param connectionManager GemFireConnection factory
+   */
+  def apply(locatorStr: String, gemfireProps: Map[String, String] = Map.empty)
+    (implicit connectionManager: GemFireConnectionManager = new DefaultGemFireConnectionManager): GemFireConnectionConf = {
+    new GemFireConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), gemfireProps, connectionManager)
+  }
+
+  /**
+   * create GemFireConnectionConf object based on SparkConf. Note that implicit can
+   * be used to control what GemFireConnectionFactory instance to use if desired
+   * @param conf a SparkConf instance 
+   */
+  def apply(conf: SparkConf): GemFireConnectionConf = {
+    val locatorStr = conf.getOption(GemFireLocatorPropKey).getOrElse(
+      throw new RuntimeException(s"SparkConf does not contain property $GemFireLocatorPropKey"))
+    // SparkConf only holds properties whose key starts with "spark.", In order to
+    // put gemfire properties in SparkConf, all gemfire properties are prefixes with
+    // "spark.gemfire.". This prefix was removed before the properties were put in `gemfireProp`
+    val prefix = "spark.gemfire."
+    val gemfireProps = conf.getAll.filter {
+        case (k, v) => k.startsWith(prefix) && k != GemFireLocatorPropKey
+      }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap
+    apply(locatorStr, gemfireProps)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala
new file mode 100644
index 0000000..3b42737
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala
@@ -0,0 +1,15 @@
+package io.pivotal.gemfire.spark.connector
+
+/**
+ * GemFireConnectionFactory provide an common interface that manages GemFire
+ * connections, and it's serializable. Each factory instance will handle
+ * connection instance creation and connection pool management.
+ */
+trait GemFireConnectionManager extends Serializable {
+
+  /** get connection for the given connector */
+  def getConnection(connConf: GemFireConnectionConf): GemFireConnection
+
+  /** close the connection */
+  def closeConnection(connConf: GemFireConnectionConf): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala
new file mode 100644
index 0000000..726b785
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala
@@ -0,0 +1,65 @@
+package io.pivotal.gemfire.spark.connector
+
+import java.io.File
+import java.net.URL
+import org.apache.commons.httpclient.methods.PostMethod
+import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity}
+import org.apache.commons.httpclient.HttpClient
+import org.apache.spark.Logging
+
+object GemFireFunctionDeployer {
+  def main(args: Array[String]) {
+    new GemFireFunctionDeployer(new HttpClient()).commandLineRun(args)
+  }
+}
+
+class GemFireFunctionDeployer(val httpClient:HttpClient) extends Logging {
+
+  def deploy(host: String, port: Int, jarLocation: String): String =
+    deploy(host + ":" + port, jarLocation)
+  
+  def deploy(host: String, port: Int, jar:File): String =
+    deploy(host + ":" + port, jar)
+  
+  def deploy(jmxHostAndPort: String, jarLocation: String): String =
+    deploy(jmxHostAndPort, jarFileHandle(jarLocation))
+  
+  def deploy(jmxHostAndPort: String, jar: File): String = {
+    val urlString = constructURLString(jmxHostAndPort)
+    val filePost: PostMethod = new PostMethod(urlString)
+    val parts: Array[Part] = new Array[Part](1)
+    parts(0) = new FilePart("resources", jar)
+    filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams))
+    val status: Int = httpClient.executeMethod(filePost)
+    "Deployed Jar with status:" + status
+  }
+
+  private[connector] def constructURLString(jmxHostAndPort: String) =
+    "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
+
+  private[connector]def jarFileHandle(jarLocation: String) = {
+    val f: File = new File(jarLocation)
+    if (!f.exists()) {
+      val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath
+      logInfo(errorMessage)
+      throw new RuntimeException(errorMessage)
+    }
+    f
+  }
+  
+  def commandLineRun(args: Array[String]):Unit = {
+    val (hostPort: String, jarFile: String) =
+    if (args.length < 2) {
+      logInfo("JMX Manager Host and Port (example: localhost:7070):")
+      val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in))
+      val jmxHostAndPort = bufferedReader.readLine()
+      logInfo("Location of gemfire-functions.jar:")
+      val functionJarLocation = bufferedReader.readLine()
+      (jmxHostAndPort, functionJarLocation)
+    } else {
+      (args(0), args(1))
+    }
+    val status = deploy(hostPort, jarFile)
+    logInfo(status)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala
new file mode 100644
index 0000000..f1213a1
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala
@@ -0,0 +1,13 @@
+package io.pivotal.gemfire.spark.connector
+
+import com.esotericsoftware.kryo.Kryo
+import io.pivotal.gemfire.spark.connector.internal.oql.UndefinedSerializer
+import org.apache.spark.serializer.KryoRegistrator
+import com.gemstone.gemfire.cache.query.internal.Undefined
+
+class GemFireKryoRegistrator extends KryoRegistrator{
+
+  override def registerClasses(kyro: Kryo): Unit = {
+    kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
new file mode 100644
index 0000000..9fb1c04
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala
@@ -0,0 +1,117 @@
+package io.pivotal.gemfire.spark.connector
+
+import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFirePairRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.Function
+import org.apache.spark.rdd.RDD
+
+/**
+ * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion.
+ * Import `io.pivotal.gemfire.spark.connector._` at the top of your program to
+ * use these functions.
+ */
+class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging {
+
+  /**
+   * Save the RDD of pairs to GemFire key-value store without any conversion
+   * @param regionPath the full path of region that the RDD is stored
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   */
+  def saveToGemfire(regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): Unit = {
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
+    val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf)
+    rdd.sparkContext.runJob(rdd, writer.write _)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in `this`
+   * RDD and the GemFire `Region[K, V2]`. Each pair of elements will be returned
+   * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the
+   * GemFire region.
+   *
+   *@param regionPath the region path of the GemFire region
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return RDD[T, V]
+   */
+  def joinGemfireRegion[K2 <: K, V2](
+    regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K, V2] = {
+    new GemFireJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in `this` RDD
+   * and the GemFire `Region[K2, V2]`. The join key from RDD element is generated by
+   * `func(K, V) => K2`, and the key from the GemFire region is jus the key of the
+   * key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
+   * where (k, v) is in `this` RDD and (k2, v2) is in the GemFire region.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return RDD[(K, V), V2]
+   */
+  def joinGemfireRegion[K2, V2](
+    regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K2, V2] =
+    new GemFireJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
+
+  /** This version of joinGemfireRegion(...) is just for Java API. */
+  private[connector] def joinGemfireRegion[K2, V2](
+    regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireJoinRDD[(K, V), K2, V2] = {
+    new GemFireJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
+  }
+
+  /**
+   * Perform a left outer join of `this` RDD and the GemFire `Region[K, V2]`.
+   * For each element (k, v) in `this` RDD, the resulting RDD will either contain
+   * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
+   * ((k, v), None)) if no element in the GemFire region have key k.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return RDD[ (K, V), Option[V] ]
+   */
+  def outerJoinGemfireRegion[K2 <: K, V2](
+    regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K, V2] = {
+    new GemFireOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
+  }
+
+  /**
+   * Perform a left outer join of `this` RDD and the GemFire `Region[K2, V2]`.
+   * The join key from RDD element is generated by `func(K, V) => K2`, and the
+   * key from region is jus the key of the key/value pair.
+   *
+   * For each element (k, v) in `this` RDD, the resulting RDD will either contain
+   * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
+   * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`.
+   *
+   *@param regionPath the region path of the GemFire region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K2 the key type of the GemFire region
+   * @tparam V2 the value type of the GemFire region
+   * @return RDD[ (K, V), Option[V] ]
+   */
+  def outerJoinGemfireRegion[K2, V2](
+    regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = {
+    new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
+  }
+
+  /** This version of outerJoinGemfireRegion(...) is just for Java API. */
+  private[connector] def outerJoinGemfireRegion[K2, V2](
+    regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = {
+    new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
+  }
+
+  private[connector] def defaultConnectionConf: GemFireConnectionConf =
+    GemFireConnectionConf(rdd.sparkContext.getConf)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
new file mode 100644
index 0000000..4ffacc5
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala
@@ -0,0 +1,93 @@
+package io.pivotal.gemfire.spark.connector
+
+import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFireRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.{PairFunction, Function}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Extra gemFire functions on non-Pair RDDs through an implicit conversion.
+ * Import `io.pivotal.gemfire.spark.connector._` at the top of your program to 
+ * use these functions.  
+ */
+class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging {
+
+  /**
+   * Save the non-pair RDD to GemFire key-value store.
+   * @param regionPath the full path of region that the RDD is stored  
+   * @param func the function that converts elements of RDD to key/value pairs
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   */
+  def saveToGemfire[K, V](regionPath: String, func: T => (K, V), connConf: GemFireConnectionConf = defaultConnectionConf): Unit = {
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
+    val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf)
+    rdd.sparkContext.runJob(rdd, writer.write(func) _)
+  }
+
+  /** This version of saveToGemfire(...) is just for Java API. */
+  private[connector] def saveToGemfire[K, V](
+    regionPath: String, func: PairFunction[T, K, V], connConf: GemFireConnectionConf): Unit = {
+    saveToGemfire[K, V](regionPath, func.call _, connConf)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in `this` RDD
+   * and the GemFire `Region[K, V]`. The join key from RDD element is generated by
+   * `func(T) => K`, and the key from the GemFire region is just the key of the
+   * key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a (t, v) tuple, 
+   * where (t) is in `this` RDD and (k, v) is in the GemFire region.
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generate region key from RDD element T
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K the key type of the GemFire region
+   * @tparam V the value type of the GemFire region
+   * @return RDD[T, V]
+   */
+  def joinGemfireRegion[K, V](regionPath: String, func: T => K, 
+    connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[T, K, V] = {
+    new GemFireJoinRDD[T, K, V](rdd, func, regionPath, connConf)    
+  }
+
+  /** This version of joinGemfireRegion(...) is just for Java API. */
+  private[connector] def joinGemfireRegion[K, V](
+    regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireJoinRDD[T, K, V] = {
+    joinGemfireRegion(regionPath, func.call _, connConf)
+  }
+
+  /**
+   * Perform a left outer join of `this` RDD and the GemFire `Region[K, V]`.
+   * The join key from RDD element is generated by `func(T) => K`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (t) in `this` RDD, the resulting RDD will either contain
+   * all pairs (t, Some(v)) for v in the GemFire region, or the pair
+   * (t, None) if no element in the GemFire region have key `func(t)`
+   *
+   * @param regionPath the region path of the GemFire region
+   * @param func the function that generate region key from RDD element T
+   * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
+   * @tparam K the key type of the GemFire region
+   * @tparam V the value type of the GemFire region
+   * @return RDD[ T, Option[V] ]
+   */
+  def outerJoinGemfireRegion[K, V](regionPath: String, func: T => K,
+    connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[T, K, V] = {
+    new GemFireOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf)
+  }
+
+  /** This version of outerJoinGemfireRegion(...) is just for Java API. */
+  private[connector] def outerJoinGemfireRegion[K, V](
+    regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[T, K, V] = {
+    outerJoinGemfireRegion(regionPath, func.call _, connConf)
+  }
+
+  private[connector] def defaultConnectionConf: GemFireConnectionConf =
+    GemFireConnectionConf(rdd.sparkContext.getConf)
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala
new file mode 100644
index 0000000..2fb128a
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala
@@ -0,0 +1,26 @@
+package io.pivotal.gemfire.spark.connector
+
+import io.pivotal.gemfire.spark.connector.internal.oql.{OQLRelation, QueryRDD}
+import org.apache.spark.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Provide GemFire OQL specific functions
+ */
+class GemFireSQLContextFunctions(@transient sqlContext: SQLContext) extends Serializable with Logging {
+
+  /**
+   * Expose a GemFire OQL query result as a DataFrame
+   * @param query the OQL query string.
+   */
+  def gemfireOQL(
+    query: String,
+    connConf: GemFireConnectionConf = GemFireConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = {
+    logInfo(s"OQL query = $query")
+    val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf)
+    sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext))
+  }
+
+  private[connector] def defaultConnectionConf: GemFireConnectionConf =
+    GemFireConnectionConf(sqlContext.sparkContext.getConf)
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala
new file mode 100644
index 0000000..ce13786
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala
@@ -0,0 +1,23 @@
+package io.pivotal.gemfire.spark.connector
+
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD
+import org.apache.spark.SparkContext
+
+import scala.reflect.ClassTag
+
+/** Provides GemFire specific methods on `SparkContext` */
+class GemFireSparkContextFunctions(@transient sc: SparkContext) extends Serializable {
+
+  /**
+   * Expose a GemFire region as a GemFireRDD
+   * @param regionPath the full path of the region
+   * @param connConf the GemFireConnectionConf that can be used to access the region
+   * @param opConf use this to specify preferred partitioner
+   *        and its parameters. The implementation will use it if it's applicable
+   */
+  def gemfireRegion[K: ClassTag, V: ClassTag] (
+    regionPath: String, connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf),
+    opConf: Map[String, String] = Map.empty): GemFireRegionRDD[K, V] =
+    GemFireRegionRDD[K, V](sc, regionPath, connConf, opConf)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
new file mode 100644
index 0000000..e31186b
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
@@ -0,0 +1,126 @@
+package io.pivotal.gemfire.spark.connector.internal
+
+import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut}
+import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService}
+import com.gemstone.gemfire.cache.query.Query
+import com.gemstone.gemfire.cache.{Region, RegionService}
+import com.gemstone.gemfire.internal.cache.execute.InternalExecution
+import io.pivotal.gemfire.spark.connector.internal.oql.QueryResultCollector
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition
+import org.apache.spark.Logging
+import io.pivotal.gemfire.spark.connector.GemFireConnection
+import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions._
+import java.util.{Set => JSet, List => JList }
+
+/**
+ * Default GemFireConnection implementation. The instance of this should be
+ * created by DefaultGemFireConnectionFactory
+ * @param locators pairs of host/port of locators
+ * @param gemFireProps The initial gemfire properties to be used.
+ */
+private[connector] class DefaultGemFireConnection (
+  locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) 
+  extends GemFireConnection with Logging {
+
+  private val clientCache = initClientCache()
+  /** a lock object only used by getRegionProxy...() */
+  private val regionLock = new Object
+
+  /** Register GemFire functions to the GemFire cluster */
+  FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance())
+  FunctionService.registerFunction(RetrieveRegionFunction.getInstance())
+
+  private def initClientCache() : ClientCache = {
+    try {
+      import io.pivotal.gemfire.spark.connector.map2Properties
+      logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""")
+      val ccf = new ClientCacheFactory(gemFireProps)
+      locators.foreach { case (host, port)  => ccf.addPoolLocator(host, port) }
+      ccf.create()
+    } catch {
+      case e: Exception =>
+        logError(s"""Failed to init ClientCache, locators=${locators.mkString(",")}, Error: $e""")
+        throw new RuntimeException(e)
+    }
+  }
+
+  /** close the clientCache */
+  override def close(): Unit =
+    if (! clientCache.isClosed) clientCache.close()
+
+  /** ----------------------------------------- */
+  /** implementation of GemFireConnection trait */
+  /** ----------------------------------------- */
+
+  override def getQuery(queryString: String): Query =
+    clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString)
+
+  override def validateRegion[K, V](regionPath: String): Unit = {
+    val md = getRegionMetadata[K, V](regionPath)
+    if (! md.isDefined) throw new RuntimeException(s"The region named $regionPath was not found")
+  }
+
+  def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = {
+    import scala.collection.JavaConversions.setAsJavaSet
+    val region = getRegionProxy[K, V](regionPath)
+    val set0: JSet[Integer] = Set[Integer](0)
+    val exec = FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0)
+    exec.setWaitOnExceptionFlag(true)
+    try {
+      val collector = exec.execute(RetrieveRegionMetadataFunction.ID)
+      val r = collector.getResult.asInstanceOf[JList[RegionMetadata]]
+      logDebug(r.get(0).toString)
+      Some(r.get(0))
+    } catch {
+      case e: FunctionException => 
+        if (e.getMessage.contains(s"The region named /$regionPath was not found")) None
+        else throw e
+    }
+  }
+
+  def getRegionProxy[K, V](regionPath: String): Region[K, V] = {
+    val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
+    if (region1 != null) region1
+    else regionLock.synchronized {
+      val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
+      if (region2 != null) region2
+      else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath)
+    }
+  }
+
+  override def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GemFireRDDPartition): Iterator[(K, V)] = {
+    val region = getRegionProxy[K, V](regionPath)
+    val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", ${split.index})"""
+    val args : Array[String] = Array[String](whereClause.getOrElse(""), desc)
+    val collector = new StructStreamingResultCollector(desc)
+        // RetrieveRegionResultCollector[(K, V)]
+    import scala.collection.JavaConversions.setAsJavaSet
+    val exec = FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution]
+      .withBucketFilter(split.bucketSet.map(Integer.valueOf))
+    exec.setWaitOnExceptionFlag(true)
+    exec.execute(RetrieveRegionFunction.ID)
+    collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], objs(1).asInstanceOf[V])}
+  }
+
+  override def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String) = {
+    import scala.collection.JavaConversions.setAsJavaSet
+    FunctionService.registerFunction(QueryFunction.getInstance())
+    val collector = new QueryResultCollector
+    val region = getRegionProxy(regionPath)
+    val args: Array[String] = Array[String](queryString, bucketSet.toString)
+    val exec = FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution]
+      .withBucketFilter(bucketSet.map(Integer.valueOf))
+      .withArgs(args)
+    exec.execute(QueryFunction.ID)
+    collector.getResult
+  }
+}
+
+
+/** The purpose of this class is making unit test DefaultGemFireConnectionManager easier */
+class DefaultGemFireConnectionFactory {
+
+  def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) =
+    new DefaultGemFireConnection(locators, gemFireProps)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala
new file mode 100644
index 0000000..0463340
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala
@@ -0,0 +1,51 @@
+package io.pivotal.gemfire.spark.connector.internal
+
+import io.pivotal.gemfire.spark.connector.{GemFireConnection, GemFireConnectionConf, GemFireConnectionManager}
+
+import scala.collection.mutable
+
+/**
+ * Default implementation of GemFireConnectionFactory
+ */
+class DefaultGemFireConnectionManager extends GemFireConnectionManager {
+
+  def getConnection(connConf: GemFireConnectionConf): GemFireConnection =
+    DefaultGemFireConnectionManager.getConnection(connConf)
+
+  def closeConnection(connConf: GemFireConnectionConf): Unit =
+    DefaultGemFireConnectionManager.closeConnection(connConf)
+
+}
+
+object DefaultGemFireConnectionManager  {
+
+  /** connection cache, keyed by host:port pair */
+  private[connector] val connections = mutable.Map[(String, Int), GemFireConnection]()
+
+  /**
+   * use locator host:port pair to lookup connection. create new connection and add it
+   * to `connections` if it does not exists.
+   */
+  def getConnection(connConf: GemFireConnectionConf)
+    (implicit factory: DefaultGemFireConnectionFactory = new DefaultGemFireConnectionFactory): GemFireConnection = {
+    val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null)
+    if (conns.nonEmpty) conns(0)
+    else connections.synchronized {
+      val conn = factory.newConnection(connConf.locators, connConf.gemfireProps)
+      connConf.locators.foreach(pair => connections += (pair -> conn))
+      conn
+    }
+  }
+
+  /**
+   * Close the connection and remove it from connection cache.
+   * Note: multiple entries may share the same connection, all those entries are removed.
+   */
+  def closeConnection(connConf: GemFireConnectionConf): Unit = {
+    val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null)
+    if (conns.nonEmpty) connections.synchronized {
+      conns(0).close()
+      connections.retain((k,v) => v != conns(0))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
new file mode 100644
index 0000000..550e0bc
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
@@ -0,0 +1,30 @@
+package io.pivotal.gemfire.spark.connector.internal
+
+import scala.util.{Failure, Success, Try}
+
+
+object LocatorHelper {
+
+  /** valid locator strings are: host[port] and host:port */
+  final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r
+  final val LocatorPattern2 = """([\w-_]+(\.[\w-_]+)*):([0-9]{2,5})""".r
+
+  /** convert single locator string to Try[(host, port)] */
+  def locatorStr2HostPortPair(locatorStr: String): Try[(String, Int)] =
+    locatorStr match {
+      case LocatorPattern1(host, domain, port) => Success((host, port.toInt))
+      case LocatorPattern2(host, domain, port) => Success((host, port.toInt))
+      case _ => Failure(new Exception(s"invalid locator: $locatorStr"))
+    }
+
+  /** 
+   * Parse locator strings and returns Seq of (hostname, port) pair. 
+   * Valid locator string are one or more "host[port]" and/or "host:port"
+   * separated by `,`. For example:
+   *    host1.mydomain.com[8888],host2.mydomain.com[8889] 
+   *    host1.mydomain.com:8888,host2.mydomain.com:8889 
+   */
+  def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] =
+    locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala
new file mode 100644
index 0000000..5d95cec
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala
@@ -0,0 +1,136 @@
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions
+
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
+import com.gemstone.gemfire.DataSerializer
+import com.gemstone.gemfire.cache.execute.ResultCollector
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl
+import com.gemstone.gemfire.cache.query.types.StructType
+import com.gemstone.gemfire.distributed.DistributedMember
+import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput}
+import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions.StructStreamingResultSender.
+       {TYPE_CHUNK, DATA_CHUNK, ERROR_CHUNK, SER_DATA, UNSER_DATA, BYTEARR_DATA}
+
+/**
+ * StructStreamingResultCollector and StructStreamingResultSender are paired
+ * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct`
+ * from GemFire server to Spark Connector (the client of GemFire server) 
+ * in streaming, i.e., while sender sending the result, the collector can
+ * start processing the arrived result without waiting for full result to
+ * become available.
+ */
+class StructStreamingResultCollector(desc: String) extends ResultCollector[Array[Byte], Iterator[Array[Object]]] {
+
+  /** the constructor that provide default `desc` (description) */
+  def this() = this("StructStreamingResultCollector")
+  
+  private val queue: BlockingQueue[Array[Byte]] = new LinkedBlockingQueue[Array[Byte]]()
+  var structType: StructType = null
+
+  /** ------------------------------------------ */
+  /**  ResultCollector interface implementations */
+  /** ------------------------------------------ */
+  
+  override def getResult: Iterator[Array[Object]] = resultIterator
+
+  override def getResult(timeout: Long, unit: TimeUnit): Iterator[Array[Object]] = 
+    throw new UnsupportedOperationException()
+
+  /** addResult add non-empty byte array (chunk) to the queue */
+  override def addResult(memberID: DistributedMember, chunk: Array[Byte]): Unit = 
+    if (chunk != null && chunk.size > 1) {
+      this.queue.add(chunk)
+      // println(s"""$desc receive from $memberID: ${chunk.mkString(" ")}""")
+    }
+
+  /** endResults add special `Array.empty` to the queue as marker of end of data */
+  override def endResults(): Unit = this.queue.add(Array.empty)
+  
+  override def clearResults(): Unit = this.queue.clear()
+
+  /** ------------------------------------------ */
+  /**             Internal methods               */
+  /** ------------------------------------------ */
+
+  def getResultType: StructType = {
+    // trigger lazy resultIterator initialization if necessary
+    if (structType == null)  resultIterator.hasNext
+    structType        
+  }
+
+  /**
+   * Note: The data is sent in chunks, and each chunk contains multiple 
+   * records. So the result iterator is an iterator (I) of iterator (II),
+   * i.e., go through each chunk (iterator (I)), and for each chunk, go 
+   * through each record (iterator (II)). 
+   */
+  private lazy val resultIterator = new Iterator[Array[Object]] {
+
+    private var currentIterator: Iterator[Array[Object]] = nextIterator()
+    
+    override def hasNext: Boolean = {
+      if (!currentIterator.hasNext && currentIterator != Iterator.empty) currentIterator = nextIterator()
+      currentIterator.hasNext
+    }
+
+    /** Note: make sure call `hasNext` first to adjust `currentIterator` */
+    override def next(): Array[Object] = currentIterator.next()
+  }
+  
+  /** get the iterator for the next chunk of data */
+  private def nextIterator(): Iterator[Array[Object]] = {
+    val chunk: Array[Byte] = queue.take
+    if (chunk.isEmpty) {
+      Iterator.empty
+    } else {
+      val input = new ByteArrayDataInput()
+      input.initialize(chunk, Version.CURRENT)
+      val chunkType = input.readByte()
+      // println(s"chunk type $chunkType")
+      chunkType match {
+        case TYPE_CHUNK =>
+          if (structType == null)
+            structType = DataSerializer.readObject(input).asInstanceOf[StructTypeImpl]
+          nextIterator()
+        case DATA_CHUNK =>
+          // require(structType != null && structType.getFieldNames.length > 0)
+          if (structType == null) structType = StructStreamingResultSender.KeyValueType
+          chunkToIterator(input, structType.getFieldNames.length)
+        case ERROR_CHUNK => 
+          val error = DataSerializer.readObject(input).asInstanceOf[Exception]
+          errorPropagationIterator(error)
+        case _ => throw new RuntimeException(s"unknown chunk type: $chunkType")
+      }
+    }
+  }
+
+  /** create a iterator that propagate sender's exception */
+  private def errorPropagationIterator(ex: Exception) = new Iterator[Array[Object]] {
+    val re = new RuntimeException(ex)
+    override def hasNext: Boolean = throw re
+    override def next(): Array[Object] = throw re
+  }
+  
+  /** convert a chunk of data to an iterator */
+  private def chunkToIterator(input: ByteArrayDataInput, rowSize: Int) = new Iterator[Array[Object]] {
+    override def hasNext: Boolean = input.available() > 0
+    val tmpInput = new ByteArrayDataInput()
+    override def next(): Array[Object] = 
+      (0 until rowSize).map { ignore =>
+        val b = input.readByte()
+        b match {
+          case SER_DATA => 
+            val arr: Array[Byte] = DataSerializer.readByteArray(input)
+            tmpInput.initialize(arr, Version.CURRENT)
+            DataSerializer.readObject(tmpInput).asInstanceOf[Object]
+          case UNSER_DATA =>
+            DataSerializer.readObject(input).asInstanceOf[Object]
+          case BYTEARR_DATA =>
+            DataSerializer.readByteArray(input).asInstanceOf[Object]
+          case _ => 
+            throw new RuntimeException(s"unknown data type $b")
+        }
+      }.toArray
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala
new file mode 100644
index 0000000..dd4a40b
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala
@@ -0,0 +1,42 @@
+package io.pivotal.gemfire.spark.connector.internal.oql
+
+import scala.util.parsing.combinator.RegexParsers
+
+class QueryParser extends RegexParsers {
+
+  def query: Parser[String] = opt(rep(IMPORT ~ PACKAGE)) ~> select ~> opt(distinct) ~> projection ~> from ~> regions <~ opt(where ~ filter) ^^ {
+    _.toString
+  }
+
+  val IMPORT: Parser[String] = "[Ii][Mm][Pp][Oo][Rr][Tt]".r
+
+  val select: Parser[String] = "[Ss][Ee][Ll][Ee][Cc][Tt]".r
+
+  val distinct: Parser[String] = "[Dd][Ii][Ss][Tt][Ii][Nn][Cc][Tt]".r
+
+  val from: Parser[String] = "[Ff][Rr][Oo][Mm]".r
+
+  val where: Parser[String] = "[Ww][Hh][Ee][Rr][Ee]".r
+
+  def PACKAGE: Parser[String] = """[\w.]+""".r
+
+  def projection: Parser[String] = "*" | repsep("""["\w]+[.\w"]*""".r, ",") ^^ {
+    _.toString
+  }
+
+  def regions: Parser[String] = repsep(region <~ opt(alias), ",") ^^ {
+    _.toString
+  }
+
+  def region: Parser[String] = """/[\w.]+[/[\w.]+]*""".r | """[\w]+[.\w]*""".r
+
+  def alias: Parser[String] = not(where) ~> """[\w]+""".r
+
+  def filter: Parser[String] = """[\w.]+[[\s]+[<>=.'\w]+]*""".r
+}
+
+object QueryParser extends QueryParser {
+
+  def parseOQL(expression: String) = parseAll(query, expression)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala
new file mode 100644
index 0000000..7c0d6a9
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala
@@ -0,0 +1,67 @@
+package io.pivotal.gemfire.spark.connector.internal.oql
+
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
+import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDPartition, ServerSplitsPartitioner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{TaskContext, SparkContext, Partition}
+import scala.reflect.ClassTag
+
+/**
+ * An RDD that provides the functionality that read the OQL query result
+ *
+ * @param sc The SparkContext this RDD is associated with
+ * @param queryString The OQL query string
+ * @param connConf The GemFireConnectionConf that provide the GemFireConnection
+ */
+class QueryRDD[T](@transient sc: SparkContext,
+                  queryString: String,
+                  connConf: GemFireConnectionConf)
+                 (implicit ct: ClassTag[T])
+  extends RDD[T](sc, Seq.empty) {
+
+  override def getPartitions: Array[Partition] = {
+    val conn = connConf.getConnection
+    val regionPath = getRegionPathFromQuery(queryString)
+    val md = conn.getRegionMetadata(regionPath)
+    md match {
+      case Some(metadata) =>
+        if (metadata.isPartitioned) {
+          val splits = ServerSplitsPartitioner.partitions(conn, metadata, Map.empty)
+          logInfo(s"QueryRDD.getPartitions():isPartitioned=true, partitions=${splits.mkString(",")}")
+          splits
+        }
+        else {
+          logInfo(s"QueryRDD.getPartitions():isPartitioned=false")
+          Array[Partition](new GemFireRDDPartition(0, Set.empty))
+
+        }
+      case None => throw new RuntimeException(s"Region $regionPath metadata was not found.")
+    }
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    val buckets = split.asInstanceOf[GemFireRDDPartition].bucketSet
+    val regionPath = getRegionPathFromQuery(queryString)
+    val result = connConf.getConnection.executeQuery(regionPath, buckets, queryString)
+    result match {
+      case it: Iterator[T] =>
+        logInfo(s"QueryRDD.compute():query=$queryString, partition=$split")
+        it
+      case _ =>
+        throw new RuntimeException("Unexpected OQL result: " + result.toString)
+    }
+  }
+
+  private def getRegionPathFromQuery(queryString: String): String = {
+    val r = QueryParser.parseOQL(queryString).get
+    r match {
+      case r: String =>
+        val start = r.indexOf("/") + 1
+        var end = r.indexOf(")")
+        if (r.indexOf(".") > 0) end = math.min(r.indexOf("."), end)
+        if (r.indexOf(",") > 0) end = math.min(r.indexOf(","), end)
+        val regionPath = r.substring(start, end)
+        regionPath
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala
new file mode 100644
index 0000000..6bbad26
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala
@@ -0,0 +1,53 @@
+package io.pivotal.gemfire.spark.connector.internal.oql
+
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import com.gemstone.gemfire.DataSerializer
+import com.gemstone.gemfire.cache.execute.ResultCollector
+import com.gemstone.gemfire.distributed.DistributedMember
+import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput}
+
+class QueryResultCollector extends ResultCollector[Array[Byte], Iterator[Object]]{
+
+  private val queue = new LinkedBlockingDeque[Array[Byte]]()
+
+  override def getResult = resultIterator
+
+  override def getResult(timeout: Long, unit: TimeUnit) = throw new UnsupportedOperationException
+
+  override def addResult(memberID: DistributedMember , chunk: Array[Byte]) =
+    if (chunk != null && chunk.size > 0) {
+      queue.add(chunk)
+    }
+
+  override def endResults = queue.add(Array.empty)
+
+
+  override def clearResults = queue.clear
+
+  private lazy val resultIterator = new Iterator[Object] {
+    private var currentIterator = nextIterator
+    def hasNext = {
+      if (!currentIterator.hasNext && currentIterator != Iterator.empty)
+        currentIterator = nextIterator
+      currentIterator.hasNext
+    }
+    def next = currentIterator.next
+  }
+
+  private def nextIterator: Iterator[Object] = {
+    val chunk = queue.take
+    if (chunk.isEmpty) {
+      Iterator.empty
+    }
+    else {
+      val input = new ByteArrayDataInput
+      input.initialize(chunk, Version.CURRENT)
+      new Iterator[Object] {
+        override def hasNext: Boolean = input.available() > 0
+        override def next: Object = DataSerializer.readObject(input).asInstanceOf[Object]
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala
----------------------------------------------------------------------
diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala
new file mode 100644
index 0000000..e4f4152
--- /dev/null
+++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala
@@ -0,0 +1,24 @@
+package io.pivotal.gemfire.spark.connector.internal.oql
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+
+import scala.tools.nsc.backend.icode.analysis.DataFlowAnalysis
+
+case class OQLRelation[T](queryRDD: QueryRDD[T])(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {
+
+  override def schema: StructType = new SchemaBuilder(queryRDD).toSparkSchema()
+
+  override def buildScan(): RDD[Row] = new RowBuilder(queryRDD).toRowRDD()
+
+}
+
+object RDDConverter {
+
+  def queryRDDToDataFrame[T](queryRDD: QueryRDD[T], sqlContext: SQLContext): DataFrame = {
+    sqlContext.baseRelationToDataFrame(OQLRelation(queryRDD)(sqlContext))
+  }
+}
\ No newline at end of file



Mime
View raw message