geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinmeil...@apache.org
Subject [41/50] [abbrv] incubator-geode git commit: GEODE-1244: Package, directory, project and file rename for geode-spark-connector
Date Thu, 21 Apr 2016 17:17:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala
new file mode 100644
index 0000000..a8666fc
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions
+
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
+import com.gemstone.gemfire.DataSerializer
+import com.gemstone.gemfire.cache.execute.ResultCollector
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl
+import com.gemstone.gemfire.cache.query.types.StructType
+import com.gemstone.gemfire.distributed.DistributedMember
+import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput}
+import io.pivotal.geode.spark.connector.internal.geodefunctions.StructStreamingResultSender.
+       {TYPE_CHUNK, DATA_CHUNK, ERROR_CHUNK, SER_DATA, UNSER_DATA, BYTEARR_DATA}
+
+/**
+ * StructStreamingResultCollector and StructStreamingResultSender are paired
+ * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct`
+ * from Geode server to Spark Connector (the client of Geode server)
+ * in streaming, i.e., while sender sending the result, the collector can
+ * start processing the arrived result without waiting for full result to
+ * become available.
+ */
+class StructStreamingResultCollector(desc: String) extends ResultCollector[Array[Byte], Iterator[Array[Object]]] {
+
+  /** the constructor that provide default `desc` (description) */
+  def this() = this("StructStreamingResultCollector")
+  
+  private val queue: BlockingQueue[Array[Byte]] = new LinkedBlockingQueue[Array[Byte]]()
+  var structType: StructType = null
+
+  /** ------------------------------------------ */
+  /**  ResultCollector interface implementations */
+  /** ------------------------------------------ */
+  
+  override def getResult: Iterator[Array[Object]] = resultIterator
+
+  override def getResult(timeout: Long, unit: TimeUnit): Iterator[Array[Object]] = 
+    throw new UnsupportedOperationException()
+
+  /** addResult add non-empty byte array (chunk) to the queue */
+  override def addResult(memberID: DistributedMember, chunk: Array[Byte]): Unit = 
+    if (chunk != null && chunk.size > 1) {
+      this.queue.add(chunk)
+      // println(s"""$desc receive from $memberID: ${chunk.mkString(" ")}""")
+    }
+
+  /** endResults add special `Array.empty` to the queue as marker of end of data */
+  override def endResults(): Unit = this.queue.add(Array.empty)
+  
+  override def clearResults(): Unit = this.queue.clear()
+
+  /** ------------------------------------------ */
+  /**             Internal methods               */
+  /** ------------------------------------------ */
+
+  def getResultType: StructType = {
+    // trigger lazy resultIterator initialization if necessary
+    if (structType == null)  resultIterator.hasNext
+    structType        
+  }
+
+  /**
+   * Note: The data is sent in chunks, and each chunk contains multiple 
+   * records. So the result iterator is an iterator (I) of iterator (II),
+   * i.e., go through each chunk (iterator (I)), and for each chunk, go 
+   * through each record (iterator (II)). 
+   */
+  private lazy val resultIterator = new Iterator[Array[Object]] {
+
+    private var currentIterator: Iterator[Array[Object]] = nextIterator()
+    
+    override def hasNext: Boolean = {
+      if (!currentIterator.hasNext && currentIterator != Iterator.empty) currentIterator = nextIterator()
+      currentIterator.hasNext
+    }
+
+    /** Note: make sure call `hasNext` first to adjust `currentIterator` */
+    override def next(): Array[Object] = currentIterator.next()
+  }
+  
+  /** get the iterator for the next chunk of data */
+  private def nextIterator(): Iterator[Array[Object]] = {
+    val chunk: Array[Byte] = queue.take
+    if (chunk.isEmpty) {
+      Iterator.empty
+    } else {
+      val input = new ByteArrayDataInput()
+      input.initialize(chunk, Version.CURRENT)
+      val chunkType = input.readByte()
+      // println(s"chunk type $chunkType")
+      chunkType match {
+        case TYPE_CHUNK =>
+          if (structType == null)
+            structType = DataSerializer.readObject(input).asInstanceOf[StructTypeImpl]
+          nextIterator()
+        case DATA_CHUNK =>
+          // require(structType != null && structType.getFieldNames.length > 0)
+          if (structType == null) structType = StructStreamingResultSender.KeyValueType
+          chunkToIterator(input, structType.getFieldNames.length)
+        case ERROR_CHUNK => 
+          val error = DataSerializer.readObject(input).asInstanceOf[Exception]
+          errorPropagationIterator(error)
+        case _ => throw new RuntimeException(s"unknown chunk type: $chunkType")
+      }
+    }
+  }
+
+  /** create a iterator that propagate sender's exception */
+  private def errorPropagationIterator(ex: Exception) = new Iterator[Array[Object]] {
+    val re = new RuntimeException(ex)
+    override def hasNext: Boolean = throw re
+    override def next(): Array[Object] = throw re
+  }
+  
+  /** convert a chunk of data to an iterator */
+  private def chunkToIterator(input: ByteArrayDataInput, rowSize: Int) = new Iterator[Array[Object]] {
+    override def hasNext: Boolean = input.available() > 0
+    val tmpInput = new ByteArrayDataInput()
+    override def next(): Array[Object] = 
+      (0 until rowSize).map { ignore =>
+        val b = input.readByte()
+        b match {
+          case SER_DATA => 
+            val arr: Array[Byte] = DataSerializer.readByteArray(input)
+            tmpInput.initialize(arr, Version.CURRENT)
+            DataSerializer.readObject(tmpInput).asInstanceOf[Object]
+          case UNSER_DATA =>
+            DataSerializer.readObject(input).asInstanceOf[Object]
+          case BYTEARR_DATA =>
+            DataSerializer.readByteArray(input).asInstanceOf[Object]
+          case _ => 
+            throw new RuntimeException(s"unknown data type $b")
+        }
+      }.toArray
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala
new file mode 100644
index 0000000..3f6dfad
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import scala.util.parsing.combinator.RegexParsers
+
+class QueryParser extends RegexParsers {
+
+  def query: Parser[String] = opt(rep(IMPORT ~ PACKAGE)) ~> select ~> opt(distinct) ~> projection ~> from ~> regions <~ opt(where ~ filter) ^^ {
+    _.toString
+  }
+
+  val IMPORT: Parser[String] = "[Ii][Mm][Pp][Oo][Rr][Tt]".r
+
+  val select: Parser[String] = "[Ss][Ee][Ll][Ee][Cc][Tt]".r
+
+  val distinct: Parser[String] = "[Dd][Ii][Ss][Tt][Ii][Nn][Cc][Tt]".r
+
+  val from: Parser[String] = "[Ff][Rr][Oo][Mm]".r
+
+  val where: Parser[String] = "[Ww][Hh][Ee][Rr][Ee]".r
+
+  def PACKAGE: Parser[String] = """[\w.]+""".r
+
+  def projection: Parser[String] = "*" | repsep("""["\w]+[.\w"]*""".r, ",") ^^ {
+    _.toString
+  }
+
+  def regions: Parser[String] = repsep(region <~ opt(alias), ",") ^^ {
+    _.toString
+  }
+
+  def region: Parser[String] = """/[\w.]+[/[\w.]+]*""".r | """[\w]+[.\w]*""".r
+
+  def alias: Parser[String] = not(where) ~> """[\w]+""".r
+
+  def filter: Parser[String] = """[\w.]+[[\s]+[<>=.'\w]+]*""".r
+}
+
+object QueryParser extends QueryParser {
+
+  def parseOQL(expression: String) = parseAll(query, expression)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala
new file mode 100644
index 0000000..474aa6a
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDPartition, ServerSplitsPartitioner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{TaskContext, SparkContext, Partition}
+import scala.reflect.ClassTag
+
+/**
+ * An RDD that provides the functionality that read the OQL query result
+ *
+ * @param sc The SparkContext this RDD is associated with
+ * @param queryString The OQL query string
+ * @param connConf The GeodeConnectionConf that provide the GeodeConnection
+ */
+class QueryRDD[T](@transient sc: SparkContext,
+                  queryString: String,
+                  connConf: GeodeConnectionConf)
+                 (implicit ct: ClassTag[T])
+  extends RDD[T](sc, Seq.empty) {
+
+  override def getPartitions: Array[Partition] = {
+    val conn = connConf.getConnection
+    val regionPath = getRegionPathFromQuery(queryString)
+    val md = conn.getRegionMetadata(regionPath)
+    md match {
+      case Some(metadata) =>
+        if (metadata.isPartitioned) {
+          val splits = ServerSplitsPartitioner.partitions(conn, metadata, Map.empty)
+          logInfo(s"QueryRDD.getPartitions():isPartitioned=true, partitions=${splits.mkString(",")}")
+          splits
+        }
+        else {
+          logInfo(s"QueryRDD.getPartitions():isPartitioned=false")
+          Array[Partition](new GeodeRDDPartition(0, Set.empty))
+
+        }
+      case None => throw new RuntimeException(s"Region $regionPath metadata was not found.")
+    }
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    val buckets = split.asInstanceOf[GeodeRDDPartition].bucketSet
+    val regionPath = getRegionPathFromQuery(queryString)
+    val result = connConf.getConnection.executeQuery(regionPath, buckets, queryString)
+    result match {
+      case it: Iterator[T] =>
+        logInfo(s"QueryRDD.compute():query=$queryString, partition=$split")
+        it
+      case _ =>
+        throw new RuntimeException("Unexpected OQL result: " + result.toString)
+    }
+  }
+
+  private def getRegionPathFromQuery(queryString: String): String = {
+    val r = QueryParser.parseOQL(queryString).get
+    r match {
+      case r: String =>
+        val start = r.indexOf("/") + 1
+        var end = r.indexOf(")")
+        if (r.indexOf(".") > 0) end = math.min(r.indexOf("."), end)
+        if (r.indexOf(",") > 0) end = math.min(r.indexOf(","), end)
+        val regionPath = r.substring(start, end)
+        regionPath
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala
new file mode 100644
index 0000000..bedc58d
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import com.gemstone.gemfire.DataSerializer
+import com.gemstone.gemfire.cache.execute.ResultCollector
+import com.gemstone.gemfire.distributed.DistributedMember
+import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput}
+
+class QueryResultCollector extends ResultCollector[Array[Byte], Iterator[Object]]{
+
+  private val queue = new LinkedBlockingDeque[Array[Byte]]()
+
+  override def getResult = resultIterator
+
+  override def getResult(timeout: Long, unit: TimeUnit) = throw new UnsupportedOperationException
+
+  override def addResult(memberID: DistributedMember , chunk: Array[Byte]) =
+    if (chunk != null && chunk.size > 0) {
+      queue.add(chunk)
+    }
+
+  override def endResults = queue.add(Array.empty)
+
+
+  override def clearResults = queue.clear
+
+  private lazy val resultIterator = new Iterator[Object] {
+    private var currentIterator = nextIterator
+    def hasNext = {
+      if (!currentIterator.hasNext && currentIterator != Iterator.empty)
+        currentIterator = nextIterator
+      currentIterator.hasNext
+    }
+    def next = currentIterator.next
+  }
+
+  private def nextIterator: Iterator[Object] = {
+    val chunk = queue.take
+    if (chunk.isEmpty) {
+      Iterator.empty
+    }
+    else {
+      val input = new ByteArrayDataInput
+      input.initialize(chunk, Version.CURRENT)
+      new Iterator[Object] {
+        override def hasNext: Boolean = input.available() > 0
+        override def next: Object = DataSerializer.readObject(input).asInstanceOf[Object]
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala
new file mode 100644
index 0000000..6a1611c
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+
+import scala.tools.nsc.backend.icode.analysis.DataFlowAnalysis
+
+case class OQLRelation[T](queryRDD: QueryRDD[T])(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {
+
+  override def schema: StructType = new SchemaBuilder(queryRDD).toSparkSchema()
+
+  override def buildScan(): RDD[Row] = new RowBuilder(queryRDD).toRowRDD()
+
+}
+
+object RDDConverter {
+
+  def queryRDDToDataFrame[T](queryRDD: QueryRDD[T], sqlContext: SQLContext): DataFrame = {
+    sqlContext.baseRelationToDataFrame(OQLRelation(queryRDD)(sqlContext))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala
new file mode 100644
index 0000000..e54411c
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import com.gemstone.gemfire.cache.query.internal.StructImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+
+class RowBuilder[T](queryRDD: QueryRDD[T]) {
+
+  /**
+   * Convert QueryRDD to RDD of Row
+   * @return RDD of Rows
+   */
+  def toRowRDD(): RDD[Row] = {
+    val rowRDD = queryRDD.map(row => {
+      row match {
+        case si: StructImpl => Row.fromSeq(si.getFieldValues)
+        case obj: Object => Row.fromSeq(Seq(obj))
+      }
+    })
+    rowRDD
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala
new file mode 100644
index 0000000..3ca20b7
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import com.gemstone.gemfire.cache.query.internal.StructImpl
+import org.apache.spark.sql.types._
+import scala.collection.mutable.ListBuffer
+import org.apache.spark.Logging
+
+class SchemaBuilder[T](queryRDD: QueryRDD[T]) extends Logging {
+
+  val nullStructType = StructType(Nil)
+  
+  val typeMap:Map[Class[_], DataType] = Map( 
+    (classOf[java.lang.String], StringType),
+    (classOf[java.lang.Integer], IntegerType),
+    (classOf[java.lang.Short], ShortType),
+    (classOf[java.lang.Long], LongType),
+    (classOf[java.lang.Double], DoubleType),
+    (classOf[java.lang.Float], FloatType),
+    (classOf[java.lang.Boolean], BooleanType),
+    (classOf[java.lang.Byte], ByteType),
+    (classOf[java.util.Date], DateType),
+    (classOf[java.lang.Object], nullStructType)
+  )
+  
+  /**
+   * Analyse QueryRDD to get the Spark schema
+   * @return The schema represented by Spark StructType
+   */
+  def toSparkSchema(): StructType = {
+    val row = queryRDD.first()
+    val tpe = row match {
+      case r: StructImpl => constructFromStruct(r)
+      case null => StructType(StructField("col1", NullType) :: Nil)
+      case default => 
+        val value = typeMap.getOrElse(default.getClass(), nullStructType)
+        StructType(StructField("col1", value) :: Nil)
+    }
+    logInfo(s"Schema: $tpe")
+    tpe
+  }
+  
+  def constructFromStruct(r:StructImpl) = {
+    val names = r.getFieldNames
+    val values = r.getFieldValues
+    val lb = new ListBuffer[StructField]()
+    for (i <- 0 until names.length) {
+      val name = names(i)
+      val value = values(i)
+      val dataType = value match {
+        case null => NullType
+        case default => typeMap.getOrElse(default.getClass,  nullStructType)
+      }
+      lb += StructField(name, dataType)
+    }
+    StructType(lb.toSeq)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala
new file mode 100644
index 0000000..37dec42
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.oql
+
+import com.esotericsoftware.kryo.{Kryo, Serializer}
+import com.esotericsoftware.kryo.io.{Output, Input}
+import com.gemstone.gemfire.cache.query.QueryService
+import com.gemstone.gemfire.cache.query.internal.Undefined
+
+/**
+ * This is the customized serializer to serialize QueryService.UNDEFINED,
+ * i.e. com.gemstone.gemfire.cache.query.internal.Undefined, in order to
+ * guarantee the singleton Undefined after its deserialization within Spark.
+ */
+class UndefinedSerializer extends Serializer[Undefined] {
+
+  def write(kryo: Kryo, output: Output, u: Undefined) {
+    //Only serialize a byte for Undefined
+    output.writeByte(u.getDSFID)
+  }
+
+  def read (kryo: Kryo, input: Input, tpe: Class[Undefined]): Undefined = {
+    //Read DSFID of Undefined
+    input.readByte()
+    QueryService.UNDEFINED match {
+      case null => new Undefined
+      case _ =>
+        //Avoid calling Undefined constructor again.
+        QueryService.UNDEFINED.asInstanceOf[Undefined]
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
new file mode 100644
index 0000000..e9dd658
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.rdd
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.rdd.RDD
+import scala.collection.JavaConversions._
+
+/**
+ * An `RDD[T, V]` that will represent the result of a join between `left` RDD[T]
+ * and the specified Geode Region[K, V].
+ */
+class GeodeJoinRDD[T, K, V] private[connector]
+  ( left: RDD[T],
+    func: T => K,
+    val regionPath: String,
+    val connConf: GeodeConnectionConf
+  ) extends RDD[(T, V)](left.context, left.dependencies) {
+
+  /** validate region existence when GeodeRDD object is created */
+  validate()
+
+  /** Validate region, and make sure it exists. */
+  private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath)
+
+  override protected def getPartitions: Array[Partition] = left.partitions
+
+  override def compute(split: Partition, context: TaskContext): Iterator[(T, V)] = {
+    val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
+    if (func == null) computeWithoutFunc(split, context, region)
+    else computeWithFunc(split, context, region)
+  }
+
+  /** T is (K, V1) since there's no map function `func` */
+  private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = {
+    val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]]
+    val leftKeys = leftPairs.map { case (k, v) => k}.toSet
+    // Note: get all will return (key, null) for non-exist entry, so remove those entries
+    val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
+    leftPairs.filter{case (k, v) => rightPairs.contains(k)}
+             .map {case (k, v) => ((k, v).asInstanceOf[T], rightPairs.get(k).get)}.toIterator
+  }
+  
+  private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = {
+    val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
+    val leftKeys = leftPairs.map { case (t, k) => k}.toSet
+    // Note: get all will return (key, null) for non-exist entry, so remove those entries
+    val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
+    leftPairs.filter { case (t, k) => rightPairs.contains(k)}.map {case (t, k) => (t, rightPairs.get(k).get)}.toIterator
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
new file mode 100644
index 0000000..3d61d47
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.rdd
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.rdd.RDD
+import scala.collection.JavaConversions._
+
+/**
+ * An `RDD[ T, Option[V] ]` that represents the result of a left outer join 
+ * between `left` RDD[T] and the specified Geode Region[K, V].
+ */
+class GeodeOuterJoinRDD[T, K, V] private[connector]
+ ( left: RDD[T],
+   func: T => K,
+   val regionPath: String,
+   val connConf: GeodeConnectionConf
+  ) extends RDD[(T, Option[V])](left.context, left.dependencies) {
+
+  /** validate region existence when GeodeRDD object is created */
+  validate()
+
+  /** Validate region, and make sure it exists. */
+  private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath)
+
+  override protected def getPartitions: Array[Partition] = left.partitions
+
+  override def compute(split: Partition, context: TaskContext): Iterator[(T, Option[V])] = {
+    val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
+    if (func == null) computeWithoutFunc(split, context, region)
+    else computeWithFunc(split, context, region)
+  }
+
+  /** T is (K1, V1), and K1 and K are the same type since there's no map function `func` */
+  private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = {
+    val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]]
+    val leftKeys = leftPairs.map { case (k, v) => k}.toSet
+    // Note: get all will return (key, null) for non-exist entry
+    val rightPairs = region.getAll(leftKeys)
+    // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option
+    leftPairs.map{ case (k, v) => ((k, v).asInstanceOf[T], Option(rightPairs.get(k))) }.toIterator
+  }
+
+  private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = {
+    val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
+    val leftKeys = leftPairs.map { case (t, k) => k}.toSet
+    // Note: get all will return (key, null) for non-exist entry
+    val rightPairs = region.getAll(leftKeys)
+    // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option
+    leftPairs.map{ case (t, k) => (t, Option(rightPairs.get(k)))}.toIterator
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
new file mode 100644
index 0000000..24fe72e
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.rdd
+
+import org.apache.spark.Partition
+
+/**
+ * This serializable class represents a GeodeRDD partition. Each partition is mapped
+ * to one or more buckets of region. The GeodeRDD can materialize the data of the 
+ * partition based on all information contained here.
+ * @param partitionId partition id, a 0 based number.
+ * @param bucketSet region bucket id set for this partition. Set.empty means whole
+ *                  region (used for replicated region)
+ * @param locations preferred location for this partition                  
+ */
+case class GeodeRDDPartition (
+  partitionId: Int, bucketSet: Set[Int], locations: Seq[String] = Nil)
+  extends Partition  {
+  
+  override def index: Int = partitionId
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
new file mode 100644
index 0000000..d960cab
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.rdd
+
+import io.pivotal.geode.spark.connector.GeodeConnection
+import io.pivotal.geode.spark.connector.internal.RegionMetadata
+import org.apache.spark.{Logging, Partition}
+
+import scala.reflect.ClassTag
+
+/**
+ * A GeodeRDD partitioner is used to partition the region into multiple RDD partitions.
+ */
+trait GeodeRDDPartitioner extends Serializable {
+
+  def name: String
+  
+  /** the function that generates partitions */
+  def partitions[K: ClassTag, V: ClassTag]
+    (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition]
+}
+
+object GeodeRDDPartitioner extends Logging {
+
+  /** To add new partitioner, just add it to the following list */
+  final val partitioners: Map[String, GeodeRDDPartitioner] =
+    List(OnePartitionPartitioner, ServerSplitsPartitioner).map(e => (e.name, e)).toMap
+
+  /**
+   * Get a partitioner based on given name, a default partitioner will be returned if there's
+   * no partitioner for the given name. 
+   */
+  def apply(name: String = defaultPartitionedRegionPartitioner.name): GeodeRDDPartitioner = {
+    val p = partitioners.get(name)
+    if (p.isDefined) p.get else {
+      logWarning(s"Invalid preferred partitioner name $name.")
+      defaultPartitionedRegionPartitioner
+    }
+  }
+
+  val defaultReplicatedRegionPartitioner = OnePartitionPartitioner
+
+  val defaultPartitionedRegionPartitioner = ServerSplitsPartitioner
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
new file mode 100644
index 0000000..4606114
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.rdd
+
+import io.pivotal.geode.spark.connector.GeodeConnection
+import io.pivotal.geode.spark.connector.internal.RegionMetadata
+import io.pivotal.geode.spark.connector.NumberPartitionsPerServerPropKey
+import org.apache.spark.Partition
+import scala.collection.JavaConversions._
+import scala.collection.immutable.SortedSet
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+/** This partitioner maps whole region to one GeodeRDDPartition */
+object OnePartitionPartitioner extends GeodeRDDPartitioner {
+
+  override val name = "OnePartition"
+
+  override def partitions[K: ClassTag, V: ClassTag]
+    (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] =
+    Array[Partition](new GeodeRDDPartition(0, Set.empty))
+}
+
+/**
+  * This partitioner maps whole region to N * M Geode RDD partitions, where M is the number of 
+  * Geode servers that contain the data for the given region. Th default value of N is 1.
+  */
+object ServerSplitsPartitioner extends GeodeRDDPartitioner {
+
+  override val name = "ServerSplits"
+
+  override def partitions[K: ClassTag, V: ClassTag]
+  (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = {
+    if (md == null) throw new RuntimeException("RegionMetadata is null")
+    val n = try { env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt } catch { case e: NumberFormatException => 2 }
+    if (!md.isPartitioned || md.getServerBucketMap == null || md.getServerBucketMap.isEmpty)
+      Array[Partition](new GeodeRDDPartition(0, Set.empty))
+    else {
+      val map = mapAsScalaMap(md.getServerBucketMap)
+        .map { case (srv, set) => (srv, asScalaSet(set).map(_.toInt)) }.toList
+        .map { case (srv, set) => (srv.getHostName, set) }
+       doPartitions(map, md.getTotalBuckets, n)
+    }
+  }
+
+  /** Converts server to bucket ID set list to array of RDD partitions */
+  def doPartitions(serverBucketMap: List[(String, mutable.Set[Int])], totalBuckets: Int, n: Int)
+    : Array[Partition] = {
+
+    // method that calculates the group size for splitting "k" items into "g" groups
+    def groupSize(k: Int, g: Int): Int = scala.math.ceil(k / g.toDouble).toInt
+
+    // 1. convert list of server and bucket set pairs to a list of server and sorted bucket set pairs
+    val srvToSortedBucketSet = serverBucketMap.map { case (srv, set) => (srv, SortedSet[Int]() ++ set) }
+
+    // 2. split bucket set of each server into n splits if possible, and server to Seq(server)
+    val srvToSplitedBuckeSet = srvToSortedBucketSet.flatMap { case (host, set) =>
+      if (set.isEmpty) Nil else set.grouped(groupSize(set.size, n)).toList.map(s => (Seq(host), s)) }
+
+    // 3. calculate empty bucket IDs by removing all bucket sets of all servers from the full bucket sets
+    val emptyIDs = SortedSet[Int]() ++ ((0 until totalBuckets).toSet /: srvToSortedBucketSet) {case (s1, (k, s2)) => s1 &~ s2}
+
+    // 4. distribute empty bucket IDs to all partitions evenly.
+    //    The empty buckets do not contain data when partitions are created, but they may contain data
+    //    when RDD is materialized, so need to include those bucket IDs in the partitions.
+    val srvToFinalBucketSet = if (emptyIDs.isEmpty) srvToSplitedBuckeSet
+      else srvToSplitedBuckeSet.zipAll(
+        emptyIDs.grouped(groupSize(emptyIDs.size, srvToSplitedBuckeSet.size)).toList, (Nil, Set.empty), Set.empty).map
+          { case ((server, set1), set2) => (server, SortedSet[Int]() ++ set1 ++ set2) }
+
+    // 5. create array of partitions w/ 0-based index
+    (0 until srvToFinalBucketSet.size).toList.zip(srvToFinalBucketSet).map
+      { case (i, (srv, set)) => new GeodeRDDPartition(i, set, srv) }.toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
new file mode 100644
index 0000000..dba15f3
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.rdd
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector._
+import org.apache.spark.{Logging, TaskContext}
+
+import scala.collection.Iterator
+import java.util.{HashMap => JMap}
+
+/** This trait provide some common code for pair and non-pair RDD writer */
+private[rdd] abstract class GeodeRDDWriterBase (opConf: Map[String, String]) extends Serializable {
+
+  val batchSize = try { opConf.getOrElse(RDDSaveBatchSizePropKey, RDDSaveBatchSizeDefault.toString).toInt}
+                  catch { case e: NumberFormatException => RDDSaveBatchSizeDefault }
+
+  def mapDump(map: Map[_, _], num: Int): String = {
+    val firstNum = map.take(num + 1)
+    if (firstNum.size > num) s"$firstNum ..." else s"$firstNum"    
+  }  
+}
+
+/**
+ * Writer object that provides write function that saves non-pair RDD partitions to Geode.
+ * Those functions will be executed on Spark executors.
+ * @param regionPath the full path of the region where the data is written to
+ */
+class GeodeRDDWriter[T, K, V] 
+  (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty)
+  extends GeodeRDDWriterBase(opConf) with Serializable with Logging {
+
+  def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): Unit = {
+    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath)
+    var count = 0
+    val chunks = data.grouped(batchSize)
+    chunks.foreach { chunk =>
+      val map = chunk.foldLeft(new JMap[K, V]()){case (m, t) => val (k, v) = func(t); m.put(k, v); m}
+      region.putAll(map)
+      count += chunk.length
+    }
+    logDebug(s"$count entries (batch.size = $batchSize) are saved to region $regionPath")
+  }
+}
+
+
+/**
+ * Writer object that provides write function that saves pair RDD partitions to Geode.
+ * Those functions will be executed on Spark executors.
+ * @param regionPath the full path of the region where the data is written to
+ */
+class GeodePairRDDWriter[K, V]
+  (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty)
+  extends GeodeRDDWriterBase(opConf) with Serializable with Logging {
+
+  def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = {
+    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath)
+    var count = 0
+    val chunks = data.grouped(batchSize)
+    chunks.foreach { chunk =>
+      val map = chunk.foldLeft(new JMap[K, V]()){case (m, (k,v)) => m.put(k,v); m}
+      region.putAll(map)
+      count += chunk.length
+    }
+    logDebug(s"$count entries (batch.batch = $batchSize) are saved to region $regionPath")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
new file mode 100644
index 0000000..6980c0f
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.rdd
+
+import scala.collection.Seq
+import scala.reflect.ClassTag
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{TaskContext, Partition, SparkContext}
+import io.pivotal.geode.spark.connector.{GeodeConnectionConf, PreferredPartitionerPropKey}
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._
+
+/**
+ * This class exposes Geode region as a RDD.
+ * @param sc the Spark Context
+ * @param regionPath the full path of the region
+ * @param connConf the GeodeConnectionConf to access the region
+ * @param opConf the parameters for this operation, such as preferred partitioner.
+ */
+class GeodeRegionRDD[K, V] private[connector]
+  (@transient sc: SparkContext,
+   val regionPath: String,
+   val connConf: GeodeConnectionConf,
+   val opConf: Map[String, String] = Map.empty,
+   val whereClause: Option[String] = None 
+  ) (implicit ctk: ClassTag[K], ctv: ClassTag[V])
+  extends RDD[(K, V)](sc, Seq.empty) {
+
+  /** validate region existence when GeodeRDD object is created */
+  validate()
+
+  /** Validate region, and make sure it exists. */
+  private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath)
+
+  def kClassTag = ctk
+  
+  def vClassTag = ctv
+
+  /**
+   * method `copy` is used by method `where` that creates new immutable
+   * GeodeRDD instance based this instance.
+   */
+  private def copy(
+    regionPath: String = regionPath,
+    connConf: GeodeConnectionConf = connConf,
+    opConf: Map[String, String] = opConf,
+    whereClause: Option[String] = None
+  ): GeodeRegionRDD[K, V] = {
+
+    require(sc != null,
+    """RDD transformation requires a non-null SparkContext. Unfortunately
+      |SparkContext in this GeodeRDD is null. This can happen after 
+      |GeodeRDD has been deserialized. SparkContext is not Serializable,
+      |therefore it deserializes to null. RDD transformations are not allowed
+      |inside lambdas used in other RDD transformations.""".stripMargin )
+
+    new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf, whereClause)
+  }
+
+  /** When where clause is specified, OQL query
+    * `select key, value from /<region-path>.entries where <where clause> `
+    * is used to filter the dataset.
+    */
+  def where(whereClause: Option[String]): GeodeRegionRDD[K, V] = {
+    if (whereClause.isDefined) copy(whereClause = whereClause)
+    else this
+  }
+
+  /** this version is for Java API that doesn't use scala.Option */
+  def where(whereClause: String): GeodeRegionRDD[K, V] = {
+    if (whereClause == null || whereClause.trim.isEmpty) this
+    else copy(whereClause = Option(whereClause.trim))
+  }
+
+  /**
+   * Use preferred partitioner generate partitions. `defaultReplicatedRegionPartitioner`
+   * will be used if it's a replicated region. 
+   */
+  override def getPartitions: Array[Partition] = {
+    val conn = connConf.getConnection
+    val md = conn.getRegionMetadata[K, V](regionPath)
+    md match {
+      case None => throw new RuntimeException(s"region $regionPath was not found.")
+      case Some(data) =>
+        logInfo(s"""RDD id=${this.id} region=$regionPath conn=${connConf.locators.mkString(",")}, env=$opConf""")
+        val p = if (data.isPartitioned) preferredPartitioner else defaultReplicatedRegionPartitioner
+        val splits = p.partitions[K, V](conn, data, opConf)
+        logDebug(s"""RDD id=${this.id} region=$regionPath partitions=\n  ${splits.mkString("\n  ")}""")
+        splits
+    }
+  }
+
+  /**
+   * provide preferred location(s) (host name(s)) of the given partition. 
+   * Only some partitioner implementation(s) provides this info, which is
+   * useful when Spark cluster and Geode cluster share some hosts.
+   */
+  override def getPreferredLocations(split: Partition) =
+    split.asInstanceOf[GeodeRDDPartition].locations
+
+  /**
+   * Get preferred partitioner. return `defaultPartitionedRegionPartitioner` if none
+   * preference is specified. 
+   */
+  private def preferredPartitioner = 
+    GeodeRDDPartitioner(opConf.getOrElse(
+      PreferredPartitionerPropKey, GeodeRDDPartitioner.defaultPartitionedRegionPartitioner.name))
+
+  /** materialize a RDD partition */
+  override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val partition = split.asInstanceOf[GeodeRDDPartition]
+    logDebug(s"compute RDD id=${this.id} partition $partition")
+    connConf.getConnection.getRegionData[K,V](regionPath, whereClause, partition)
+    // new InterruptibleIterator(context, split.asInstanceOf[GeodeRDDPartition[K, V]].iterator)
+  }
+}
+
+object GeodeRegionRDD {
+
+  def apply[K: ClassTag, V: ClassTag](sc: SparkContext, regionPath: String,
+    connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty)
+    : GeodeRegionRDD[K, V] =
+    new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
new file mode 100644
index 0000000..f859173
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi
+
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD
+import org.apache.spark.api.java.JavaPairRDD
+
+class GeodeJavaRegionRDD[K, V](rdd: GeodeRegionRDD[K, V]) extends JavaPairRDD[K, V](rdd)(rdd.kClassTag, rdd.vClassTag) {
+  
+  def where(whereClause: String): GeodeJavaRegionRDD[K, V] = new GeodeJavaRegionRDD(rdd.where(whereClause))
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
new file mode 100644
index 0000000..ffa6195
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi
+
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
+import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream}
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+/**
+ *  A helper class to make it possible to access components written in Scala from Java code.
+ */
+private[connector] object JavaAPIHelper {
+
+  /** Returns a `ClassTag` of a given runtime class. */
+  def getClassTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz)
+
+  /**
+   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+   * see JavaSparkContext.fakeClassTag in Spark for more info.
+   */
+  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+  /** Converts a Java `Properties` to a Scala immutable `Map[String, String]`. */
+  def propertiesToScalaMap[K, V](props: java.util.Properties): Map[String, String] =
+    Map(props.toSeq: _*)
+
+  /** convert a JavaRDD[(K,V)] to JavaPairRDD[K,V] */
+  def toJavaPairRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] =
+    JavaPairRDD.fromJavaRDD(rdd)
+
+  /** convert a JavaDStream[(K,V)] to JavaPairDStream[K,V] */
+  def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] =
+    JavaPairDStream.fromJavaDStream(ds)
+
+  /** an empty Map[String, String] for default opConf **/
+  val emptyStrStrMap: Map[String, String] = Map.empty
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
new file mode 100644
index 0000000..6f9a780
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark
+
+import io.pivotal.geode.spark.connector.internal.rdd.{ServerSplitsPartitioner, OnePartitionPartitioner}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+import scala.reflect.ClassTag
+
+/**
+ * The root package of Geode connector for Apache Spark.
+ * Provides handy implicit conversions that add geode-specific
+ * methods to `SparkContext` and `RDD`.
+ */
+package object connector {
+
+  /** constants */
+  final val GeodeLocatorPropKey = "spark.geode.locators"
+  // partitioner related keys and values
+  final val PreferredPartitionerPropKey = "preferred.partitioner"
+  final val NumberPartitionsPerServerPropKey = "number.partitions.per.server"
+  final val OnePartitionPartitionerName = OnePartitionPartitioner.name
+  final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name
+
+  final val RDDSaveBatchSizePropKey = "rdd.save.batch.size"
+  final val RDDSaveBatchSizeDefault = 10000
+  
+  /** implicits */
+  
+  implicit def toSparkContextFunctions(sc: SparkContext): GeodeSparkContextFunctions =
+    new GeodeSparkContextFunctions(sc)
+
+  implicit def toSQLContextFunctions(sqlContext: SQLContext): GeodeSQLContextFunctions =
+    new GeodeSQLContextFunctions(sqlContext)
+
+  implicit def toGeodePairRDDFunctions[K: ClassTag, V: ClassTag]
+    (self: RDD[(K, V)]): GeodePairRDDFunctions[K, V] = new GeodePairRDDFunctions(self)
+
+  implicit def toGeodeRDDFunctions[T: ClassTag]
+    (self: RDD[T]): GeodeRDDFunctions[T] = new GeodeRDDFunctions(self)
+
+  /** utility implicits */
+  
+  /** convert Map[String, String] to java.util.Properties */
+  implicit def map2Properties(map: Map[String,String]): java.util.Properties =
+    (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
+
+  /** internal util methods */
+  
+  private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n  "): String =
+    rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p loc=${rdd.preferredLocations(p)}"}.mkString(sep)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
new file mode 100644
index 0000000..4d46429
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.streaming
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodePairRDDWriter, GeodeRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.PairFunction
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Extra geode functions on DStream of non-pair elements through an implicit conversion.
+ * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your program to
+ * use these functions.
+ */
+class GeodeDStreamFunctions[T](val dstream: DStream[T]) extends Serializable with Logging {
+
+  /**
+   * Save the DStream of non-pair elements to Geode key-value store.
+   * @param regionPath the full path of region that the DStream is stored
+   * @param func the function that converts elements of the DStream to key/value pairs
+   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+   * @param opConf the optional parameters for this operation
+   */
+  def saveToGeode[K, V](
+      regionPath: String, 
+      func: T => (K, V), 
+      connConf: GeodeConnectionConf = defaultConnectionConf, 
+      opConf: Map[String, String] = Map.empty): Unit = {
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf)
+    logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""")
+    dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) _))
+  }
+
+  /** this version of saveToGeode is just for Java API */
+  def saveToGeode[K, V](
+      regionPath: String,
+      func: PairFunction[T, K, V],
+      connConf: GeodeConnectionConf,
+      opConf: Map[String, String] ): Unit = {
+    saveToGeode[K, V](regionPath, func.call _, connConf, opConf)
+  }
+
+  private[connector] def defaultConnectionConf: GeodeConnectionConf =
+    GeodeConnectionConf(dstream.context.sparkContext.getConf)
+}
+
+
+/**
+ * Extra geode functions on DStream of (key, value) pairs through an implicit conversion.
+ * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your program to
+ * use these functions.
+ */
+class GeodePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends Serializable with Logging {
+
+  /**
+   * Save the DStream of pairs to Geode key-value store without any conversion
+   * @param regionPath the full path of region that the DStream is stored
+   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
+   * @param opConf the optional parameters for this operation
+   */
+  def saveToGeode(
+      regionPath: String, 
+      connConf: GeodeConnectionConf = defaultConnectionConf, 
+      opConf: Map[String, String] = Map.empty): Unit = {
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf)
+    logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""")
+    dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))
+  }
+
+  private[connector] def defaultConnectionConf: GeodeConnectionConf =
+    GeodeConnectionConf(dstream.context.sparkContext.getConf)
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
new file mode 100644
index 0000000..0d1f1eb
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Provides handy implicit conversions that add gemfire-specific methods to `DStream`.
+ */
+package object streaming {
+
+  implicit def toGeodeDStreamFunctions[T](ds: DStream[T]): GeodeDStreamFunctions[T] =
+    new GeodeDStreamFunctions[T](ds)
+
+  implicit def toGeodePairDStreamFunctions[K, V](ds: DStream[(K, V)]): GeodePairDStreamFunctions[K, V] =
+    new GeodePairDStreamFunctions[K, V](ds)
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java b/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java
new file mode 100644
index 0000000..142907e
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector;
+
+import io.pivotal.geode.spark.connector.javaapi.*;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.SQLContext;
+//import org.apache.spark.sql.api.java.JavaSQLContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.dstream.DStream;
+import org.junit.Test;
+import org.scalatest.junit.JUnitSuite;
+import scala.Function1;
+import scala.Function2;
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.collection.mutable.LinkedList;
+import scala.reflect.ClassTag;
+
+import static org.junit.Assert.*;
+import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+public class JavaAPITest extends JUnitSuite {
+
+  @SuppressWarnings( "unchecked" )
+  public Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> createCommonMocks() {
+    SparkContext mockSparkContext = mock(SparkContext.class);
+    GeodeConnectionConf mockConnConf = mock(GeodeConnectionConf.class);
+    GeodeConnection mockConnection = mock(GeodeConnection.class);
+    when(mockConnConf.getConnection()).thenReturn(mockConnection);
+    when(mockConnConf.locators()).thenReturn(new LinkedList());
+    return new Tuple3<>(mockSparkContext, mockConnConf, mockConnection);
+  }
+  
+  @Test
+  public void testSparkContextFunction() throws Exception {
+    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
+    GeodeJavaSparkContextFunctions wrapper = javaFunctions(tuple3._1());
+    assertTrue(tuple3._1() == wrapper.sc);
+    String regionPath = "testregion";
+    JavaPairRDD<String, String> rdd = wrapper.geodeRegion(regionPath, tuple3._2());
+    verify(tuple3._3()).validateRegion(regionPath);
+  }
+
+  @Test
+  public void testJavaSparkContextFunctions() throws Exception {
+    SparkContext mockSparkContext = mock(SparkContext.class);
+    JavaSparkContext mockJavaSparkContext = mock(JavaSparkContext.class);
+    when(mockJavaSparkContext.sc()).thenReturn(mockSparkContext);
+    GeodeJavaSparkContextFunctions wrapper = javaFunctions(mockJavaSparkContext);
+    assertTrue(mockSparkContext == wrapper.sc);
+  }
+  
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaPairRDDFunctions() throws Exception {
+    JavaPairRDD<String, Integer> mockPairRDD = mock(JavaPairRDD.class);
+    RDD<Tuple2<String, Integer>> mockTuple2RDD = mock(RDD.class);
+    when(mockPairRDD.rdd()).thenReturn(mockTuple2RDD);
+    GeodeJavaPairRDDFunctions wrapper = javaFunctions(mockPairRDD);
+    assertTrue(mockTuple2RDD == wrapper.rddf.rdd());
+
+    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
+    when(mockTuple2RDD.sparkContext()).thenReturn(tuple3._1());
+    String regionPath = "testregion";
+    wrapper.saveToGeode(regionPath, tuple3._2());
+    verify(mockTuple2RDD, times(1)).sparkContext();
+    verify(tuple3._1(), times(1)).runJob(eq(mockTuple2RDD), any(Function2.class), any(ClassTag.class));
+  }
+
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaRDDFunctions() throws Exception {
+    JavaRDD<String> mockJavaRDD = mock(JavaRDD.class);
+    RDD<String> mockRDD = mock(RDD.class);
+    when(mockJavaRDD.rdd()).thenReturn(mockRDD);
+    GeodeJavaRDDFunctions wrapper = javaFunctions(mockJavaRDD);
+    assertTrue(mockRDD == wrapper.rddf.rdd());
+
+    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
+    when(mockRDD.sparkContext()).thenReturn(tuple3._1());
+    PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class);
+    String regionPath = "testregion";
+    wrapper.saveToGeode(regionPath, mockPairFunc, tuple3._2());
+    verify(mockRDD, times(1)).sparkContext();
+    verify(tuple3._1(), times(1)).runJob(eq(mockRDD), any(Function2.class), any(ClassTag.class));
+  }
+
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaPairDStreamFunctions() throws Exception {
+    JavaPairDStream<String, String> mockJavaDStream = mock(JavaPairDStream.class);
+    DStream<Tuple2<String, String>> mockDStream = mock(DStream.class);
+    when(mockJavaDStream.dstream()).thenReturn(mockDStream);
+    GeodeJavaPairDStreamFunctions wrapper = javaFunctions(mockJavaDStream);
+    assertTrue(mockDStream == wrapper.dsf.dstream());
+
+    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
+    String regionPath = "testregion";
+    wrapper.saveToGeode(regionPath, tuple3._2());
+    verify(tuple3._2()).getConnection();
+    verify(tuple3._3()).validateRegion(regionPath);
+    verify(mockDStream).foreachRDD(any(Function1.class));
+  }
+
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaPairDStreamFunctionsWithTuple2DStream() throws Exception {
+    JavaDStream<Tuple2<String, String>> mockJavaDStream = mock(JavaDStream.class);
+    DStream<Tuple2<String, String>> mockDStream = mock(DStream.class);
+    when(mockJavaDStream.dstream()).thenReturn(mockDStream);
+    GeodeJavaPairDStreamFunctions wrapper = javaFunctions(toJavaPairDStream(mockJavaDStream));
+    assertTrue(mockDStream == wrapper.dsf.dstream());
+  }
+
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaDStreamFunctions() throws Exception {
+    JavaDStream<String> mockJavaDStream = mock(JavaDStream.class);
+    DStream<String> mockDStream = mock(DStream.class);
+    when(mockJavaDStream.dstream()).thenReturn(mockDStream);
+    GeodeJavaDStreamFunctions wrapper = javaFunctions(mockJavaDStream);
+    assertTrue(mockDStream == wrapper.dsf.dstream());
+
+    Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks();
+    PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class);
+    String regionPath = "testregion";
+    wrapper.saveToGeode(regionPath, mockPairFunc, tuple3._2());
+    verify(tuple3._2()).getConnection();
+    verify(tuple3._3()).validateRegion(regionPath);
+    verify(mockDStream).foreachRDD(any(Function1.class));
+  }
+
+  @Test
+  public void testSQLContextFunction() throws Exception {
+    SQLContext mockSQLContext = mock(SQLContext.class);
+    GeodeJavaSQLContextFunctions wrapper = javaFunctions(mockSQLContext);
+    assertTrue(wrapper.scf.getClass() == GeodeSQLContextFunctions.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
new file mode 100644
index 0000000..4e45dc2
--- /dev/null
+++ b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.commons.httpclient.HttpClient
+import java.io.File
+
+
+class GeodeFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar {
+  val mockHttpClient: HttpClient = mock[HttpClient]
+    
+  test("jmx url creation") {
+    val jmxHostAndPort = "localhost:7070"
+    val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
+    val gfd = new GeodeFunctionDeployer(mockHttpClient);
+    val urlString = gfd.constructURLString(jmxHostAndPort)
+    assert(urlString === expectedUrlString)
+  }
+    
+  test("missing jar file") {
+    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
+    val gfd = new GeodeFunctionDeployer(mockHttpClient);
+    intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)}
+  }
+  
+  test("deploy with missing jar") {
+    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
+    val gfd = new GeodeFunctionDeployer(mockHttpClient);
+    intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))}
+    intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))}
+  }
+  
+  test("successful mocked deploy") {
+    val gfd = new GeodeFunctionDeployer(mockHttpClient);
+    val jar = new File("README.md");
+    assert(gfd.deploy("localhost:7070", jar).contains("Deployed"))
+  }
+  
+
+    
+}


Mime
View raw message