geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasonhu...@apache.org
Subject [08/11] incubator-geode git commit: GEODE-1244: Package, directory, project and file rename for geode-spark-connector
Date Wed, 20 Apr 2016 22:51:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala
deleted file mode 100644
index 51f1d01..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import io.pivotal.gemfire.spark.connector.internal.oql.{OQLRelation, QueryRDD}
-import org.apache.spark.Logging
-import org.apache.spark.sql.{DataFrame, SQLContext}
-
-/**
- * Provide GemFire OQL specific functions
- */
-class GemFireSQLContextFunctions(@transient sqlContext: SQLContext) extends Serializable with Logging {
-
-  /**
-   * Expose a GemFire OQL query result as a DataFrame
-   * @param query the OQL query string.
-   */
-  def gemfireOQL(
-    query: String,
-    connConf: GemFireConnectionConf = GemFireConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = {
-    logInfo(s"OQL query = $query")
-    val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf)
-    sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext))
-  }
-
-  private[connector] def defaultConnectionConf: GemFireConnectionConf =
-    GemFireConnectionConf(sqlContext.sparkContext.getConf)
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala
deleted file mode 100644
index 5341977..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector
-
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD
-import org.apache.spark.SparkContext
-
-import scala.reflect.ClassTag
-
-/** Provides GemFire specific methods on `SparkContext` */
-class GemFireSparkContextFunctions(@transient sc: SparkContext) extends Serializable {
-
-  /**
-   * Expose a GemFire region as a GemFireRDD
-   * @param regionPath the full path of the region
-   * @param connConf the GemFireConnectionConf that can be used to access the region
-   * @param opConf use this to specify preferred partitioner
-   *        and its parameters. The implementation will use it if it's applicable
-   */
-  def gemfireRegion[K: ClassTag, V: ClassTag] (
-    regionPath: String, connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf),
-    opConf: Map[String, String] = Map.empty): GemFireRegionRDD[K, V] =
-    GemFireRegionRDD[K, V](sc, regionPath, connConf, opConf)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
deleted file mode 100644
index 7d147b2..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal
-
-import java.net.InetAddress
-
-import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut}
-import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService}
-import com.gemstone.gemfire.cache.query.Query
-import com.gemstone.gemfire.cache.{Region, RegionService}
-import com.gemstone.gemfire.internal.cache.execute.InternalExecution
-import io.pivotal.gemfire.spark.connector.internal.oql.QueryResultCollector
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition
-import org.apache.spark.{SparkEnv, Logging}
-import io.pivotal.gemfire.spark.connector.GemFireConnection
-import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions._
-import java.util.{Set => JSet, List => JList }
-
-/**
- * Default GemFireConnection implementation. The instance of this should be
- * created by DefaultGemFireConnectionFactory
- * @param locators pairs of host/port of locators
- * @param gemFireProps The initial gemfire properties to be used.
- */
-private[connector] class DefaultGemFireConnection (
-  locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) 
-  extends GemFireConnection with Logging {
-
-  private val clientCache = initClientCache()
-
-  /** Register GemFire functions to the GemFire cluster */
-  FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance())
-  FunctionService.registerFunction(RetrieveRegionFunction.getInstance())
-
-  private def initClientCache() : ClientCache = {
-    try {
-      val ccf = getClientCacheFactory
-      ccf.create()
-    } catch {
-      case e: Exception =>
-        logError(s"""Failed to init ClientCache, locators=${locators.mkString(",")}, Error: $e""")
-        throw new RuntimeException(e)
-    }
-  }
-  
-  private def getClientCacheFactory: ClientCacheFactory = {
-    import io.pivotal.gemfire.spark.connector.map2Properties
-    val ccf = new ClientCacheFactory(gemFireProps)
-    ccf.setPoolReadTimeout(30000)
-    val servers = LocatorHelper.getAllGemFireServers(locators)
-    if (servers.isDefined && servers.get.size > 0) {
-      val sparkIp = System.getenv("SPARK_LOCAL_IP")
-      val hostName = if (sparkIp != null) InetAddress.getByName(sparkIp).getCanonicalHostName
-                     else InetAddress.getLocalHost.getCanonicalHostName
-      val executorId = SparkEnv.get.executorId      
-      val pickedServers = LocatorHelper.pickPreferredGemFireServers(servers.get, hostName, executorId)
-      logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, host=$hostName executor=$executorId props=$gemFireProps""")
-      logDebug(s"""Init ClientCache: all-severs=${pickedServers.mkString(",")}""")
-      pickedServers.foreach{ case (host, port)  => ccf.addPoolServer(host, port) }
-    } else {
-      logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""")
-      locators.foreach { case (host, port)  => ccf.addPoolLocator(host, port) }
-    }
-    ccf
-  }
-
-  /** close the clientCache */
-  override def close(): Unit =
-    if (! clientCache.isClosed) clientCache.close()
-
-  /** ----------------------------------------- */
-  /** implementation of GemFireConnection trait */
-  /** ----------------------------------------- */
-
-  override def getQuery(queryString: String): Query =
-    clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString)
-
-  override def validateRegion[K, V](regionPath: String): Unit = {
-    val md = getRegionMetadata[K, V](regionPath)
-    if (! md.isDefined) throw new RuntimeException(s"The region named $regionPath was not found")
-  }
-
-  def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = {
-    import scala.collection.JavaConversions.setAsJavaSet
-    val region = getRegionProxy[K, V](regionPath)
-    val set0: JSet[Integer] = Set[Integer](0)
-    val exec = FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0)
-    exec.setWaitOnExceptionFlag(true)
-    try {
-      val collector = exec.execute(RetrieveRegionMetadataFunction.ID)
-      val r = collector.getResult.asInstanceOf[JList[RegionMetadata]]
-      logDebug(r.get(0).toString)
-      Some(r.get(0))
-    } catch {
-      case e: FunctionException => 
-        if (e.getMessage.contains(s"The region named /$regionPath was not found")) None
-        else throw e
-    }
-  }
-
-  def getRegionProxy[K, V](regionPath: String): Region[K, V] = {
-    val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
-    if (region1 != null) region1
-    else DefaultGemFireConnection.regionLock.synchronized {
-      val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
-      if (region2 != null) region2
-      else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath)
-    }
-  }
-
-  override def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GemFireRDDPartition): Iterator[(K, V)] = {
-    val region = getRegionProxy[K, V](regionPath)
-    val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", ${split.index})"""
-    val args : Array[String] = Array[String](whereClause.getOrElse(""), desc)
-    val collector = new StructStreamingResultCollector(desc)
-        // RetrieveRegionResultCollector[(K, V)]
-    import scala.collection.JavaConversions.setAsJavaSet
-    val exec = FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution]
-      .withBucketFilter(split.bucketSet.map(Integer.valueOf))
-    exec.setWaitOnExceptionFlag(true)
-    exec.execute(RetrieveRegionFunction.ID)
-    collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], objs(1).asInstanceOf[V])}
-  }
-
-  override def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String) = {
-    import scala.collection.JavaConversions.setAsJavaSet
-    FunctionService.registerFunction(QueryFunction.getInstance())
-    val collector = new QueryResultCollector
-    val region = getRegionProxy(regionPath)
-    val args: Array[String] = Array[String](queryString, bucketSet.toString)
-    val exec = FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution]
-      .withBucketFilter(bucketSet.map(Integer.valueOf))
-      .withArgs(args)
-    exec.execute(QueryFunction.ID)
-    collector.getResult
-  }
-}
-
-private[connector] object DefaultGemFireConnection {
-  /** a lock object only used by getRegionProxy...() */
-  private val regionLock = new Object
-}
-
-/** The purpose of this class is making unit test DefaultGemFireConnectionManager easier */
-class DefaultGemFireConnectionFactory {
-
-  def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) =
-    new DefaultGemFireConnection(locators, gemFireProps)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala
deleted file mode 100644
index 6722ca8..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal
-
-import io.pivotal.gemfire.spark.connector.{GemFireConnection, GemFireConnectionConf, GemFireConnectionManager}
-
-import scala.collection.mutable
-
-/**
- * Default implementation of GemFireConnectionFactory
- */
-class DefaultGemFireConnectionManager extends GemFireConnectionManager {
-
-  def getConnection(connConf: GemFireConnectionConf): GemFireConnection =
-    DefaultGemFireConnectionManager.getConnection(connConf)
-
-  def closeConnection(connConf: GemFireConnectionConf): Unit =
-    DefaultGemFireConnectionManager.closeConnection(connConf)
-
-}
-
-object DefaultGemFireConnectionManager  {
-
-  /** connection cache, keyed by host:port pair */
-  private[connector] val connections = mutable.Map[(String, Int), GemFireConnection]()
-
-  /**
-   * use locator host:port pair to lookup cached connection. create new connection 
-   * and add it to the cache `connections` if it does not exist.
-   */
-  def getConnection(connConf: GemFireConnectionConf)
-    (implicit factory: DefaultGemFireConnectionFactory = new DefaultGemFireConnectionFactory): GemFireConnection = {
-
-    def getCachedConnection(locators: Seq[(String, Int)]): GemFireConnection = {
-      val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null)
-      if (conns.nonEmpty) conns(0) else null
-    }
-
-    val conn1 = getCachedConnection(connConf.locators)
-    if (conn1 != null) conn1
-    else connections.synchronized {
-      val conn2 = getCachedConnection(connConf.locators)
-      if (conn2 != null) conn2
-      else {
-        val conn3 = factory.newConnection(connConf.locators, connConf.gemfireProps)
-        connConf.locators.foreach(pair => connections += (pair -> conn3))
-        conn3
-      }
-    }
-  }
-
-  /**
-   * Close the connection and remove it from connection cache.
-   * Note: multiple entries may share the same connection, all those entries are removed.
-   */
-  def closeConnection(connConf: GemFireConnectionConf): Unit = {
-    val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null)
-    if (conns.nonEmpty) connections.synchronized {
-      conns(0).close()
-      connections.retain((k,v) => v != conns(0))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
deleted file mode 100644
index 1d72775..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal
-
-import java.net.InetSocketAddress
-import java.util.{ArrayList => JArrayList}
-
-import com.gemstone.gemfire.cache.client.internal.locator.{GetAllServersResponse, GetAllServersRequest}
-import com.gemstone.gemfire.distributed.internal.ServerLocation
-import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient
-import org.apache.spark.Logging
-
-import scala.util.{Failure, Success, Try}
-
-
-object LocatorHelper extends Logging {
-
-  /** valid locator strings are: host[port] and host:port */
-  final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r
-  final val LocatorPattern2 = """([\w-_]+(\.[\w-_]+)*):([0-9]{2,5})""".r
-
-  /** convert single locator string to Try[(host, port)] */
-  def locatorStr2HostPortPair(locatorStr: String): Try[(String, Int)] =
-    locatorStr match {
-      case LocatorPattern1(host, domain, port) => Success((host, port.toInt))
-      case LocatorPattern2(host, domain, port) => Success((host, port.toInt))
-      case _ => Failure(new Exception(s"invalid locator: $locatorStr"))
-    }
-
-  /** 
-   * Parse locator strings and returns Seq of (hostname, port) pair. 
-   * Valid locator string are one or more "host[port]" and/or "host:port"
-   * separated by `,`. For example:
-   *    host1.mydomain.com[8888],host2.mydomain.com[8889] 
-   *    host1.mydomain.com:8888,host2.mydomain.com:8889 
-   */
-  def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] =
-    locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get)
-
-
-  /**
-   * Return the list of live GemFire servers for the given locators. 
-   * @param locators locators for the given GemFire cluster
-   * @param serverGroup optional server group name, default is "" (empty string)
-   */
-  def getAllGemFireServers(locators: Seq[(String, Int)], serverGroup: String = ""): Option[Seq[(String, Int)]] = {
-    var result: Option[Seq[(String, Int)]] = None
-    locators.find { case (host, port) =>
-      try {
-        val addr = new InetSocketAddress(host, port)
-        val req = new GetAllServersRequest(serverGroup)
-        val res = TcpClient.requestToServer(addr.getAddress, addr.getPort, req, 2000)
-        if (res != null) {
-          import scala.collection.JavaConverters._
-          val servers = res.asInstanceOf[GetAllServersResponse].getServers.asInstanceOf[JArrayList[ServerLocation]]
-          if (servers.size > 0)
-            result = Some(servers.asScala.map(e => (e.getHostName, e.getPort)))
-        }
-      } catch { case e: Exception => logWarning("getAllGemFireServers error", e)
-      }
-      result.isDefined
-    }
-    result
-  }
-
-  /**
-   * Pick up at most 3 preferred servers from all available servers based on
-   * host name and Spark executor id.
-   *
-   * This method is used by DefaultGemFireConnection to create ClientCache. Usually
-   * one server is enough to initialize ClientCacheFactory, but this provides two
-   * backup servers in case of the 1st server can't be connected.
-   *   
-   * @param servers all available servers in the form of (hostname, port) pairs
-   * @param hostName the host name of the Spark executor
-   * @param executorId the Spark executor Id, such as "<driver>", "0", "1", ...
-   * @return Seq[(hostname, port)] of preferred servers
-   */
-  def pickPreferredGemFireServers(
-    servers: Seq[(String, Int)], hostName: String, executorId: String): Seq[(String, Int)] = {
-
-    // pick up `length` items form the Seq starts at the `start` position.
-    //  The Seq is treated as a ring, so at most `Seq.size` items can be picked
-    def circularTake[T](seq: Seq[T], start: Int, length: Int): Seq[T] = {
-      val size = math.min(seq.size, length)
-      (start until start + size).map(x => seq(x % seq.size))
-    }
-
-    // map executor id to int: "<driver>" (or non-number string) to 0, and "n" to n + 1
-    val id = try { executorId.toInt + 1 } catch { case e: NumberFormatException => 0 }
-    
-    // algorithm: 
-    // 1. sort server list
-    // 2. split sorted server list into 3 sub-lists a, b, and c:
-    //      list-a: servers on the given host
-    //      list-b: servers that are in front of list-a on the sorted server list
-    //      list-c: servers that are behind list-a on the sorted server list
-    //    then rotate list-a based on executor id, then create new server list:
-    //      modified list-a ++ list-c ++ list-b
-    // 3. if there's no server on the given host, then create new server list
-    //    by rotating sorted server list based on executor id.
-    // 4. take up to 3 servers from the new server list
-    val sortedServers = servers.sorted
-    val firstIdx = sortedServers.indexWhere(p => p._1 == hostName)
-    val lastIdx = if (firstIdx < 0) -1 else sortedServers.lastIndexWhere(p => p._1 == hostName)
-
-    if (firstIdx < 0) { // no local server
-      circularTake(sortedServers, id, 3)
-    } else {
-      val (seq1, seq2) = sortedServers.splitAt(firstIdx)
-      val seq = if (firstIdx == lastIdx) {  // one local server
-        seq2 ++ seq1
-      } else { // multiple local server
-        val (seq3, seq4) = seq2.splitAt(lastIdx - firstIdx + 1)
-        val seq3b = if (id % seq3.size == 0) seq3 else circularTake(seq3, id, seq3.size)
-        seq3b ++ seq4 ++ seq1
-      }
-      circularTake(seq, 0, 3)
-    }
-  }  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala
deleted file mode 100644
index 39bc0cc..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions
-
-import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
-import com.gemstone.gemfire.DataSerializer
-import com.gemstone.gemfire.cache.execute.ResultCollector
-import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl
-import com.gemstone.gemfire.cache.query.types.StructType
-import com.gemstone.gemfire.distributed.DistributedMember
-import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput}
-import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions.StructStreamingResultSender.
-       {TYPE_CHUNK, DATA_CHUNK, ERROR_CHUNK, SER_DATA, UNSER_DATA, BYTEARR_DATA}
-
-/**
- * StructStreamingResultCollector and StructStreamingResultSender are paired
- * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct`
- * from GemFire server to Spark Connector (the client of GemFire server) 
- * in streaming, i.e., while sender sending the result, the collector can
- * start processing the arrived result without waiting for full result to
- * become available.
- */
-class StructStreamingResultCollector(desc: String) extends ResultCollector[Array[Byte], Iterator[Array[Object]]] {
-
-  /** the constructor that provide default `desc` (description) */
-  def this() = this("StructStreamingResultCollector")
-  
-  private val queue: BlockingQueue[Array[Byte]] = new LinkedBlockingQueue[Array[Byte]]()
-  var structType: StructType = null
-
-  /** ------------------------------------------ */
-  /**  ResultCollector interface implementations */
-  /** ------------------------------------------ */
-  
-  override def getResult: Iterator[Array[Object]] = resultIterator
-
-  override def getResult(timeout: Long, unit: TimeUnit): Iterator[Array[Object]] = 
-    throw new UnsupportedOperationException()
-
-  /** addResult add non-empty byte array (chunk) to the queue */
-  override def addResult(memberID: DistributedMember, chunk: Array[Byte]): Unit = 
-    if (chunk != null && chunk.size > 1) {
-      this.queue.add(chunk)
-      // println(s"""$desc receive from $memberID: ${chunk.mkString(" ")}""")
-    }
-
-  /** endResults add special `Array.empty` to the queue as marker of end of data */
-  override def endResults(): Unit = this.queue.add(Array.empty)
-  
-  override def clearResults(): Unit = this.queue.clear()
-
-  /** ------------------------------------------ */
-  /**             Internal methods               */
-  /** ------------------------------------------ */
-
-  def getResultType: StructType = {
-    // trigger lazy resultIterator initialization if necessary
-    if (structType == null)  resultIterator.hasNext
-    structType        
-  }
-
-  /**
-   * Note: The data is sent in chunks, and each chunk contains multiple 
-   * records. So the result iterator is an iterator (I) of iterator (II),
-   * i.e., go through each chunk (iterator (I)), and for each chunk, go 
-   * through each record (iterator (II)). 
-   */
-  private lazy val resultIterator = new Iterator[Array[Object]] {
-
-    private var currentIterator: Iterator[Array[Object]] = nextIterator()
-    
-    override def hasNext: Boolean = {
-      if (!currentIterator.hasNext && currentIterator != Iterator.empty) currentIterator = nextIterator()
-      currentIterator.hasNext
-    }
-
-    /** Note: make sure call `hasNext` first to adjust `currentIterator` */
-    override def next(): Array[Object] = currentIterator.next()
-  }
-  
-  /** get the iterator for the next chunk of data */
-  private def nextIterator(): Iterator[Array[Object]] = {
-    val chunk: Array[Byte] = queue.take
-    if (chunk.isEmpty) {
-      Iterator.empty
-    } else {
-      val input = new ByteArrayDataInput()
-      input.initialize(chunk, Version.CURRENT)
-      val chunkType = input.readByte()
-      // println(s"chunk type $chunkType")
-      chunkType match {
-        case TYPE_CHUNK =>
-          if (structType == null)
-            structType = DataSerializer.readObject(input).asInstanceOf[StructTypeImpl]
-          nextIterator()
-        case DATA_CHUNK =>
-          // require(structType != null && structType.getFieldNames.length > 0)
-          if (structType == null) structType = StructStreamingResultSender.KeyValueType
-          chunkToIterator(input, structType.getFieldNames.length)
-        case ERROR_CHUNK => 
-          val error = DataSerializer.readObject(input).asInstanceOf[Exception]
-          errorPropagationIterator(error)
-        case _ => throw new RuntimeException(s"unknown chunk type: $chunkType")
-      }
-    }
-  }
-
-  /** create a iterator that propagate sender's exception */
-  private def errorPropagationIterator(ex: Exception) = new Iterator[Array[Object]] {
-    val re = new RuntimeException(ex)
-    override def hasNext: Boolean = throw re
-    override def next(): Array[Object] = throw re
-  }
-  
-  /** convert a chunk of data to an iterator */
-  private def chunkToIterator(input: ByteArrayDataInput, rowSize: Int) = new Iterator[Array[Object]] {
-    override def hasNext: Boolean = input.available() > 0
-    val tmpInput = new ByteArrayDataInput()
-    override def next(): Array[Object] = 
-      (0 until rowSize).map { ignore =>
-        val b = input.readByte()
-        b match {
-          case SER_DATA => 
-            val arr: Array[Byte] = DataSerializer.readByteArray(input)
-            tmpInput.initialize(arr, Version.CURRENT)
-            DataSerializer.readObject(tmpInput).asInstanceOf[Object]
-          case UNSER_DATA =>
-            DataSerializer.readObject(input).asInstanceOf[Object]
-          case BYTEARR_DATA =>
-            DataSerializer.readByteArray(input).asInstanceOf[Object]
-          case _ => 
-            throw new RuntimeException(s"unknown data type $b")
-        }
-      }.toArray
-  }
-  
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala
deleted file mode 100644
index 71e1823..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.oql
-
-import scala.util.parsing.combinator.RegexParsers
-
-class QueryParser extends RegexParsers {
-
-  def query: Parser[String] = opt(rep(IMPORT ~ PACKAGE)) ~> select ~> opt(distinct) ~> projection ~> from ~> regions <~ opt(where ~ filter) ^^ {
-    _.toString
-  }
-
-  val IMPORT: Parser[String] = "[Ii][Mm][Pp][Oo][Rr][Tt]".r
-
-  val select: Parser[String] = "[Ss][Ee][Ll][Ee][Cc][Tt]".r
-
-  val distinct: Parser[String] = "[Dd][Ii][Ss][Tt][Ii][Nn][Cc][Tt]".r
-
-  val from: Parser[String] = "[Ff][Rr][Oo][Mm]".r
-
-  val where: Parser[String] = "[Ww][Hh][Ee][Rr][Ee]".r
-
-  def PACKAGE: Parser[String] = """[\w.]+""".r
-
-  def projection: Parser[String] = "*" | repsep("""["\w]+[.\w"]*""".r, ",") ^^ {
-    _.toString
-  }
-
-  def regions: Parser[String] = repsep(region <~ opt(alias), ",") ^^ {
-    _.toString
-  }
-
-  def region: Parser[String] = """/[\w.]+[/[\w.]+]*""".r | """[\w]+[.\w]*""".r
-
-  def alias: Parser[String] = not(where) ~> """[\w]+""".r
-
-  def filter: Parser[String] = """[\w.]+[[\s]+[<>=.'\w]+]*""".r
-}
-
-object QueryParser extends QueryParser {
-
-  def parseOQL(expression: String) = parseAll(query, expression)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala
deleted file mode 100644
index c16a70d..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.oql
-
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
-import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDPartition, ServerSplitsPartitioner}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{TaskContext, SparkContext, Partition}
-import scala.reflect.ClassTag
-
-/**
- * An RDD that provides the functionality that read the OQL query result
- *
- * @param sc The SparkContext this RDD is associated with
- * @param queryString The OQL query string
- * @param connConf The GemFireConnectionConf that provide the GemFireConnection
- */
-class QueryRDD[T](@transient sc: SparkContext,
-                  queryString: String,
-                  connConf: GemFireConnectionConf)
-                 (implicit ct: ClassTag[T])
-  extends RDD[T](sc, Seq.empty) {
-
-  override def getPartitions: Array[Partition] = {
-    val conn = connConf.getConnection
-    val regionPath = getRegionPathFromQuery(queryString)
-    val md = conn.getRegionMetadata(regionPath)
-    md match {
-      case Some(metadata) =>
-        if (metadata.isPartitioned) {
-          val splits = ServerSplitsPartitioner.partitions(conn, metadata, Map.empty)
-          logInfo(s"QueryRDD.getPartitions():isPartitioned=true, partitions=${splits.mkString(",")}")
-          splits
-        }
-        else {
-          logInfo(s"QueryRDD.getPartitions():isPartitioned=false")
-          Array[Partition](new GemFireRDDPartition(0, Set.empty))
-
-        }
-      case None => throw new RuntimeException(s"Region $regionPath metadata was not found.")
-    }
-  }
-
-  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    val buckets = split.asInstanceOf[GemFireRDDPartition].bucketSet
-    val regionPath = getRegionPathFromQuery(queryString)
-    val result = connConf.getConnection.executeQuery(regionPath, buckets, queryString)
-    result match {
-      case it: Iterator[T] =>
-        logInfo(s"QueryRDD.compute():query=$queryString, partition=$split")
-        it
-      case _ =>
-        throw new RuntimeException("Unexpected OQL result: " + result.toString)
-    }
-  }
-
-  private def getRegionPathFromQuery(queryString: String): String = {
-    val r = QueryParser.parseOQL(queryString).get
-    r match {
-      case r: String =>
-        val start = r.indexOf("/") + 1
-        var end = r.indexOf(")")
-        if (r.indexOf(".") > 0) end = math.min(r.indexOf("."), end)
-        if (r.indexOf(",") > 0) end = math.min(r.indexOf(","), end)
-        val regionPath = r.substring(start, end)
-        regionPath
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala
deleted file mode 100644
index 7032e5a..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.oql
-
-import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
-
-import com.gemstone.gemfire.DataSerializer
-import com.gemstone.gemfire.cache.execute.ResultCollector
-import com.gemstone.gemfire.distributed.DistributedMember
-import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput}
-
-class QueryResultCollector extends ResultCollector[Array[Byte], Iterator[Object]]{
-
-  private val queue = new LinkedBlockingDeque[Array[Byte]]()
-
-  override def getResult = resultIterator
-
-  override def getResult(timeout: Long, unit: TimeUnit) = throw new UnsupportedOperationException
-
-  override def addResult(memberID: DistributedMember , chunk: Array[Byte]) =
-    if (chunk != null && chunk.size > 0) {
-      queue.add(chunk)
-    }
-
-  override def endResults = queue.add(Array.empty)
-
-
-  override def clearResults = queue.clear
-
-  private lazy val resultIterator = new Iterator[Object] {
-    private var currentIterator = nextIterator
-    def hasNext = {
-      if (!currentIterator.hasNext && currentIterator != Iterator.empty)
-        currentIterator = nextIterator
-      currentIterator.hasNext
-    }
-    def next = currentIterator.next
-  }
-
-  private def nextIterator: Iterator[Object] = {
-    val chunk = queue.take
-    if (chunk.isEmpty) {
-      Iterator.empty
-    }
-    else {
-      val input = new ByteArrayDataInput
-      input.initialize(chunk, Version.CURRENT)
-      new Iterator[Object] {
-        override def hasNext: Boolean = input.available() > 0
-        override def next: Object = DataSerializer.readObject(input).asInstanceOf[Object]
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala
deleted file mode 100644
index 894066c..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.oql
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.sources.{BaseRelation, TableScan}
-
-import scala.tools.nsc.backend.icode.analysis.DataFlowAnalysis
-
-case class OQLRelation[T](queryRDD: QueryRDD[T])(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {
-
-  override def schema: StructType = new SchemaBuilder(queryRDD).toSparkSchema()
-
-  override def buildScan(): RDD[Row] = new RowBuilder(queryRDD).toRowRDD()
-
-}
-
-object RDDConverter {
-
-  def queryRDDToDataFrame[T](queryRDD: QueryRDD[T], sqlContext: SQLContext): DataFrame = {
-    sqlContext.baseRelationToDataFrame(OQLRelation(queryRDD)(sqlContext))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala
deleted file mode 100644
index 3a4fa6a..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.oql
-
-import com.gemstone.gemfire.cache.query.internal.StructImpl
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-
-class RowBuilder[T](queryRDD: QueryRDD[T]) {
-
-  /**
-   * Convert QueryRDD to RDD of Row
-   * @return RDD of Rows
-   */
-  def toRowRDD(): RDD[Row] = {
-    val rowRDD = queryRDD.map(row => {
-      row match {
-        case si: StructImpl => Row.fromSeq(si.getFieldValues)
-        case obj: Object => Row.fromSeq(Seq(obj))
-      }
-    })
-    rowRDD
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala
deleted file mode 100644
index 11fff90..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.oql
-
-import com.gemstone.gemfire.cache.query.internal.StructImpl
-import org.apache.spark.sql.types._
-import scala.collection.mutable.ListBuffer
-import org.apache.spark.Logging
-
-class SchemaBuilder[T](queryRDD: QueryRDD[T]) extends Logging {
-
-  val nullStructType = StructType(Nil)
-  
-  val typeMap:Map[Class[_], DataType] = Map( 
-    (classOf[java.lang.String], StringType),
-    (classOf[java.lang.Integer], IntegerType),
-    (classOf[java.lang.Short], ShortType),
-    (classOf[java.lang.Long], LongType),
-    (classOf[java.lang.Double], DoubleType),
-    (classOf[java.lang.Float], FloatType),
-    (classOf[java.lang.Boolean], BooleanType),
-    (classOf[java.lang.Byte], ByteType),
-    (classOf[java.util.Date], DateType),
-    (classOf[java.lang.Object], nullStructType)
-  )
-  
-  /**
-   * Analyse QueryRDD to get the Spark schema
-   * @return The schema represented by Spark StructType
-   */
-  def toSparkSchema(): StructType = {
-    val row = queryRDD.first()
-    val tpe = row match {
-      case r: StructImpl => constructFromStruct(r)
-      case null => StructType(StructField("col1", NullType) :: Nil)
-      case default => 
-        val value = typeMap.getOrElse(default.getClass(), nullStructType)
-        StructType(StructField("col1", value) :: Nil)
-    }
-    logInfo(s"Schema: $tpe")
-    tpe
-  }
-  
-  def constructFromStruct(r:StructImpl) = {
-    val names = r.getFieldNames
-    val values = r.getFieldValues
-    val lb = new ListBuffer[StructField]()
-    for (i <- 0 until names.length) {
-      val name = names(i)
-      val value = values(i)
-      val dataType = value match {
-        case null => NullType
-        case default => typeMap.getOrElse(default.getClass,  nullStructType)
-      }
-      lb += StructField(name, dataType)
-    }
-    StructType(lb.toSeq)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala
deleted file mode 100644
index 35e8120..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.oql
-
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import com.esotericsoftware.kryo.io.{Output, Input}
-import com.gemstone.gemfire.cache.query.QueryService
-import com.gemstone.gemfire.cache.query.internal.Undefined
-
-/**
- * This is the customized serializer to serialize QueryService.UNDEFINED,
- * i.e. com.gemstone.gemfire.cache.query.internal.Undefined, in order to
- * guarantee the singleton Undefined after its deserialization within Spark.
- */
-class UndefinedSerializer extends Serializer[Undefined] {
-
-  def write(kryo: Kryo, output: Output, u: Undefined) {
-    //Only serialize a byte for Undefined
-    output.writeByte(u.getDSFID)
-  }
-
-  def read (kryo: Kryo, input: Input, tpe: Class[Undefined]): Undefined = {
-    //Read DSFID of Undefined
-    input.readByte()
-    QueryService.UNDEFINED match {
-      case null => new Undefined
-      case _ =>
-        //Avoid calling Undefined constructor again.
-        QueryService.UNDEFINED.asInstanceOf[Undefined]
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala
deleted file mode 100644
index cfa1537..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.rdd
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
-import org.apache.spark.{TaskContext, Partition}
-import org.apache.spark.rdd.RDD
-import scala.collection.JavaConversions._
-
-/**
- * An `RDD[T, V]` that will represent the result of a join between `left` RDD[T]
- * and the specified GemFire Region[K, V].
- */
-class GemFireJoinRDD[T, K, V] private[connector]
-  ( left: RDD[T],
-    func: T => K,
-    val regionPath: String,
-    val connConf: GemFireConnectionConf
-  ) extends RDD[(T, V)](left.context, left.dependencies) {
-
-  /** validate region existence when GemFireRDD object is created */
-  validate()
-
-  /** Validate region, and make sure it exists. */
-  private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath)
-
-  override protected def getPartitions: Array[Partition] = left.partitions
-
-  override def compute(split: Partition, context: TaskContext): Iterator[(T, V)] = {
-    val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
-    if (func == null) computeWithoutFunc(split, context, region)
-    else computeWithFunc(split, context, region)
-  }
-
-  /** T is (K, V1) since there's no map function `func` */
-  private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = {
-    val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]]
-    val leftKeys = leftPairs.map { case (k, v) => k}.toSet
-    // Note: get all will return (key, null) for non-exist entry, so remove those entries
-    val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
-    leftPairs.filter{case (k, v) => rightPairs.contains(k)}
-             .map {case (k, v) => ((k, v).asInstanceOf[T], rightPairs.get(k).get)}.toIterator
-  }
-  
-  private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = {
-    val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
-    val leftKeys = leftPairs.map { case (t, k) => k}.toSet
-    // Note: get all will return (key, null) for non-exist entry, so remove those entries
-    val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
-    leftPairs.filter { case (t, k) => rightPairs.contains(k)}.map {case (t, k) => (t, rightPairs.get(k).get)}.toIterator
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala
deleted file mode 100644
index ec3a512..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.rdd
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
-import org.apache.spark.{TaskContext, Partition}
-import org.apache.spark.rdd.RDD
-import scala.collection.JavaConversions._
-
-/**
- * An `RDD[ T, Option[V] ]` that represents the result of a left outer join 
- * between `left` RDD[T] and the specified GemFire Region[K, V].
- */
-class GemFireOuterJoinRDD[T, K, V] private[connector]
- ( left: RDD[T],
-   func: T => K,
-   val regionPath: String,
-   val connConf: GemFireConnectionConf
-  ) extends RDD[(T, Option[V])](left.context, left.dependencies) {
-
-  /** validate region existence when GemFireRDD object is created */
-  validate()
-
-  /** Validate region, and make sure it exists. */
-  private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath)
-
-  override protected def getPartitions: Array[Partition] = left.partitions
-
-  override def compute(split: Partition, context: TaskContext): Iterator[(T, Option[V])] = {
-    val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
-    if (func == null) computeWithoutFunc(split, context, region)
-    else computeWithFunc(split, context, region)
-  }
-
-  /** T is (K1, V1), and K1 and K are the same type since there's no map function `func` */
-  private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = {
-    val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]]
-    val leftKeys = leftPairs.map { case (k, v) => k}.toSet
-    // Note: get all will return (key, null) for non-exist entry
-    val rightPairs = region.getAll(leftKeys)
-    // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option
-    leftPairs.map{ case (k, v) => ((k, v).asInstanceOf[T], Option(rightPairs.get(k))) }.toIterator
-  }
-
-  private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = {
-    val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
-    val leftKeys = leftPairs.map { case (t, k) => k}.toSet
-    // Note: get all will return (key, null) for non-exist entry
-    val rightPairs = region.getAll(leftKeys)
-    // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option
-    leftPairs.map{ case (t, k) => (t, Option(rightPairs.get(k)))}.toIterator
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala
deleted file mode 100644
index 02e1eed..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.rdd
-
-import org.apache.spark.Partition
-
-/**
- * This serializable class represents a GemFireRDD partition. Each partition is mapped
- * to one or more buckets of region. The GemFireRDD can materialize the data of the 
- * partition based on all information contained here.
- * @param partitionId partition id, a 0 based number.
- * @param bucketSet region bucket id set for this partition. Set.empty means whole
- *                  region (used for replicated region)
- * @param locations preferred location for this partition                  
- */
-case class GemFireRDDPartition (
-  partitionId: Int, bucketSet: Set[Int], locations: Seq[String] = Nil)
-  extends Partition  {
-  
-  override def index: Int = partitionId
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala
deleted file mode 100644
index 807a979..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.rdd
-
-import io.pivotal.gemfire.spark.connector.GemFireConnection
-import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
-import org.apache.spark.{Logging, Partition}
-
-import scala.reflect.ClassTag
-
-/**
- * A GemFireRDD partitioner is used to partition the region into multiple RDD partitions.
- */
-trait GemFireRDDPartitioner extends Serializable {
-
-  def name: String
-  
-  /** the function that generates partitions */
-  def partitions[K: ClassTag, V: ClassTag]
-    (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition]
-}
-
-object GemFireRDDPartitioner extends Logging {
-
-  /** To add new partitioner, just add it to the following list */
-  final val partitioners: Map[String, GemFireRDDPartitioner] =
-    List(OnePartitionPartitioner, ServerSplitsPartitioner).map(e => (e.name, e)).toMap
-
-  /**
-   * Get a partitioner based on given name, a default partitioner will be returned if there's
-   * no partitioner for the given name. 
-   */
-  def apply(name: String = defaultPartitionedRegionPartitioner.name): GemFireRDDPartitioner = {
-    val p = partitioners.get(name)
-    if (p.isDefined) p.get else {
-      logWarning(s"Invalid preferred partitioner name $name.")
-      defaultPartitionedRegionPartitioner
-    }
-  }
-
-  val defaultReplicatedRegionPartitioner = OnePartitionPartitioner
-
-  val defaultPartitionedRegionPartitioner = ServerSplitsPartitioner
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
deleted file mode 100644
index 72904ee..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.rdd
-
-import io.pivotal.gemfire.spark.connector.GemFireConnection
-import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
-import io.pivotal.gemfire.spark.connector.NumberPartitionsPerServerPropKey
-import org.apache.spark.Partition
-import scala.collection.JavaConversions._
-import scala.collection.immutable.SortedSet
-import scala.collection.mutable
-import scala.reflect.ClassTag
-
-/** This partitioner maps whole region to one GemFireRDDPartition */
-object OnePartitionPartitioner extends GemFireRDDPartitioner {
-
-  override val name = "OnePartition"
-
-  override def partitions[K: ClassTag, V: ClassTag]
-    (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] =
-    Array[Partition](new GemFireRDDPartition(0, Set.empty))
-}
-
-/**
-  * This partitioner maps whole region to N * M GemFire RDD partitions, where M is the number of 
-  * GemFire servers that contain the data for the given region. Th default value of N is 1.
-  */
-object ServerSplitsPartitioner extends GemFireRDDPartitioner {
-
-  override val name = "ServerSplits"
-
-  override def partitions[K: ClassTag, V: ClassTag]
-  (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = {
-    if (md == null) throw new RuntimeException("RegionMetadata is null")
-    val n = try { env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt } catch { case e: NumberFormatException => 2 }
-    if (!md.isPartitioned || md.getServerBucketMap == null || md.getServerBucketMap.isEmpty)
-      Array[Partition](new GemFireRDDPartition(0, Set.empty))
-    else {
-      val map = mapAsScalaMap(md.getServerBucketMap)
-        .map { case (srv, set) => (srv, asScalaSet(set).map(_.toInt)) }.toList
-        .map { case (srv, set) => (srv.getHostName, set) }
-       doPartitions(map, md.getTotalBuckets, n)
-    }
-  }
-
-  /** Converts server to bucket ID set list to array of RDD partitions */
-  def doPartitions(serverBucketMap: List[(String, mutable.Set[Int])], totalBuckets: Int, n: Int)
-    : Array[Partition] = {
-
-    // method that calculates the group size for splitting "k" items into "g" groups
-    def groupSize(k: Int, g: Int): Int = scala.math.ceil(k / g.toDouble).toInt
-
-    // 1. convert list of server and bucket set pairs to a list of server and sorted bucket set pairs
-    val srvToSortedBucketSet = serverBucketMap.map { case (srv, set) => (srv, SortedSet[Int]() ++ set) }
-
-    // 2. split bucket set of each server into n splits if possible, and server to Seq(server)
-    val srvToSplitedBuckeSet = srvToSortedBucketSet.flatMap { case (host, set) =>
-      if (set.isEmpty) Nil else set.grouped(groupSize(set.size, n)).toList.map(s => (Seq(host), s)) }
-
-    // 3. calculate empty bucket IDs by removing all bucket sets of all servers from the full bucket sets
-    val emptyIDs = SortedSet[Int]() ++ ((0 until totalBuckets).toSet /: srvToSortedBucketSet) {case (s1, (k, s2)) => s1 &~ s2}
-
-    // 4. distribute empty bucket IDs to all partitions evenly.
-    //    The empty buckets do not contain data when partitions are created, but they may contain data
-    //    when RDD is materialized, so need to include those bucket IDs in the partitions.
-    val srvToFinalBucketSet = if (emptyIDs.isEmpty) srvToSplitedBuckeSet
-      else srvToSplitedBuckeSet.zipAll(
-        emptyIDs.grouped(groupSize(emptyIDs.size, srvToSplitedBuckeSet.size)).toList, (Nil, Set.empty), Set.empty).map
-          { case ((server, set1), set2) => (server, SortedSet[Int]() ++ set1 ++ set2) }
-
-    // 5. create array of partitions w/ 0-based index
-    (0 until srvToFinalBucketSet.size).toList.zip(srvToFinalBucketSet).map
-      { case (i, (srv, set)) => new GemFireRDDPartition(i, set, srv) }.toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
deleted file mode 100644
index b6b1330..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.rdd
-
-import com.gemstone.gemfire.cache.Region
-import io.pivotal.gemfire.spark.connector._
-import org.apache.spark.{Logging, TaskContext}
-
-import scala.collection.Iterator
-import java.util.{HashMap => JMap}
-
-/** This trait provide some common code for pair and non-pair RDD writer */
-private[rdd] abstract class GemFireRDDWriterBase (opConf: Map[String, String]) extends Serializable {
-
-  val batchSize = try { opConf.getOrElse(RDDSaveBatchSizePropKey, RDDSaveBatchSizeDefault.toString).toInt}
-                  catch { case e: NumberFormatException => RDDSaveBatchSizeDefault }
-
-  def mapDump(map: Map[_, _], num: Int): String = {
-    val firstNum = map.take(num + 1)
-    if (firstNum.size > num) s"$firstNum ..." else s"$firstNum"    
-  }  
-}
-
-/**
- * Writer object that provides write function that saves non-pair RDD partitions to GemFire.
- * Those functions will be executed on Spark executors.
- * @param regionPath the full path of the region where the data is written to
- */
-class GemFireRDDWriter[T, K, V] 
-  (regionPath: String, connConf: GemFireConnectionConf, opConf: Map[String, String] = Map.empty)
-  extends GemFireRDDWriterBase(opConf) with Serializable with Logging {
-
-  def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): Unit = {
-    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath)
-    var count = 0
-    val chunks = data.grouped(batchSize)
-    chunks.foreach { chunk =>
-      val map = chunk.foldLeft(new JMap[K, V]()){case (m, t) => val (k, v) = func(t); m.put(k, v); m}
-      region.putAll(map)
-      count += chunk.length
-    }
-    logDebug(s"$count entries (batch.size = $batchSize) are saved to region $regionPath")
-  }
-}
-
-
-/**
- * Writer object that provides write function that saves pair RDD partitions to GemFire.
- * Those functions will be executed on Spark executors.
- * @param regionPath the full path of the region where the data is written to
- */
-class GemFirePairRDDWriter[K, V]
-  (regionPath: String, connConf: GemFireConnectionConf, opConf: Map[String, String] = Map.empty)
-  extends GemFireRDDWriterBase(opConf) with Serializable with Logging {
-
-  def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = {
-    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath)
-    var count = 0
-    val chunks = data.grouped(batchSize)
-    chunks.foreach { chunk =>
-      val map = chunk.foldLeft(new JMap[K, V]()){case (m, (k,v)) => m.put(k,v); m}
-      region.putAll(map)
-      count += chunk.length
-    }
-    logDebug(s"$count entries (batch.batch = $batchSize) are saved to region $regionPath")
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
deleted file mode 100644
index 4535917..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.internal.rdd
-
-import scala.collection.Seq
-import scala.reflect.ClassTag
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{TaskContext, Partition, SparkContext}
-import io.pivotal.gemfire.spark.connector.{GemFireConnectionConf, PreferredPartitionerPropKey}
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._
-
-/**
- * This class exposes GemFire region as a RDD.
- * @param sc the Spark Context
- * @param regionPath the full path of the region
- * @param connConf the GemFireConnectionConf to access the region
- * @param opConf the parameters for this operation, such as preferred partitioner.
- */
-class GemFireRegionRDD[K, V] private[connector]
-  (@transient sc: SparkContext,
-   val regionPath: String,
-   val connConf: GemFireConnectionConf,
-   val opConf: Map[String, String] = Map.empty,
-   val whereClause: Option[String] = None 
-  ) (implicit ctk: ClassTag[K], ctv: ClassTag[V])
-  extends RDD[(K, V)](sc, Seq.empty) {
-
-  /** validate region existence when GemFireRDD object is created */
-  validate()
-
-  /** Validate region, and make sure it exists. */
-  private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath)
-
-  def kClassTag = ctk
-  
-  def vClassTag = ctv
-
-  /**
-   * method `copy` is used by method `where` that creates new immutable
-   * GemFireRDD instance based this instance.
-   */
-  private def copy(
-    regionPath: String = regionPath,
-    connConf: GemFireConnectionConf = connConf,
-    opConf: Map[String, String] = opConf,
-    whereClause: Option[String] = None
-  ): GemFireRegionRDD[K, V] = {
-
-    require(sc != null,
-    """RDD transformation requires a non-null SparkContext. Unfortunately
-      |SparkContext in this GemFireRDD is null. This can happen after 
-      |GemFireRDD has been deserialized. SparkContext is not Serializable,
-      |therefore it deserializes to null. RDD transformations are not allowed
-      |inside lambdas used in other RDD transformations.""".stripMargin )
-
-    new GemFireRegionRDD[K, V](sc, regionPath, connConf, opConf, whereClause)
-  }
-
-  /** When where clause is specified, OQL query
-    * `select key, value from /<region-path>.entries where <where clause> `
-    * is used to filter the dataset.
-    */
-  def where(whereClause: Option[String]): GemFireRegionRDD[K, V] = {
-    if (whereClause.isDefined) copy(whereClause = whereClause)
-    else this
-  }
-
-  /** this version is for Java API that doesn't use scala.Option */
-  def where(whereClause: String): GemFireRegionRDD[K, V] = {
-    if (whereClause == null || whereClause.trim.isEmpty) this
-    else copy(whereClause = Option(whereClause.trim))
-  }
-
-  /**
-   * Use preferred partitioner generate partitions. `defaultReplicatedRegionPartitioner`
-   * will be used if it's a replicated region. 
-   */
-  override def getPartitions: Array[Partition] = {
-    val conn = connConf.getConnection
-    val md = conn.getRegionMetadata[K, V](regionPath)
-    md match {
-      case None => throw new RuntimeException(s"region $regionPath was not found.")
-      case Some(data) =>
-        logInfo(s"""RDD id=${this.id} region=$regionPath conn=${connConf.locators.mkString(",")}, env=$opConf""")
-        val p = if (data.isPartitioned) preferredPartitioner else defaultReplicatedRegionPartitioner
-        val splits = p.partitions[K, V](conn, data, opConf)
-        logDebug(s"""RDD id=${this.id} region=$regionPath partitions=\n  ${splits.mkString("\n  ")}""")
-        splits
-    }
-  }
-
-  /**
-   * provide preferred location(s) (host name(s)) of the given partition. 
-   * Only some partitioner implementation(s) provides this info, which is
-   * useful when Spark cluster and GemFire cluster share some hosts.
-   */
-  override def getPreferredLocations(split: Partition) =
-    split.asInstanceOf[GemFireRDDPartition].locations
-
-  /**
-   * Get preferred partitioner. return `defaultPartitionedRegionPartitioner` if none
-   * preference is specified. 
-   */
-  private def preferredPartitioner = 
-    GemFireRDDPartitioner(opConf.getOrElse(
-      PreferredPartitionerPropKey, GemFireRDDPartitioner.defaultPartitionedRegionPartitioner.name))
-
-  /** materialize a RDD partition */
-  override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
-    val partition = split.asInstanceOf[GemFireRDDPartition]
-    logDebug(s"compute RDD id=${this.id} partition $partition")
-    connConf.getConnection.getRegionData[K,V](regionPath, whereClause, partition)
-    // new InterruptibleIterator(context, split.asInstanceOf[GemFireRDDPartition[K, V]].iterator)
-  }
-}
-
-object GemFireRegionRDD {
-
-  def apply[K: ClassTag, V: ClassTag](sc: SparkContext, regionPath: String,
-    connConf: GemFireConnectionConf, opConf: Map[String, String] = Map.empty)
-    : GemFireRegionRDD[K, V] =
-    new GemFireRegionRDD[K, V](sc, regionPath, connConf, opConf)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala
deleted file mode 100644
index 1a2e8f2..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.javaapi
-
-import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD
-import org.apache.spark.api.java.JavaPairRDD
-
-class GemFireJavaRegionRDD[K, V](rdd: GemFireRegionRDD[K, V]) extends JavaPairRDD[K, V](rdd)(rdd.kClassTag, rdd.vClassTag) {
-  
-  def where(whereClause: String): GemFireJavaRegionRDD[K, V] = new GemFireJavaRegionRDD(rdd.where(whereClause))
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
deleted file mode 100644
index 3d10fb8..0000000
--- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.gemfire.spark.connector.javaapi
-
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
-import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream}
-
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
-/**
- *  A helper class to make it possible to access components written in Scala from Java code.
- */
-private[connector] object JavaAPIHelper {
-
-  /** Returns a `ClassTag` of a given runtime class. */
-  def getClassTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz)
-
-  /**
-   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
-   * see JavaSparkContext.fakeClassTag in Spark for more info.
-   */
-  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
-
-  /** Converts a Java `Properties` to a Scala immutable `Map[String, String]`. */
-  def propertiesToScalaMap[K, V](props: java.util.Properties): Map[String, String] =
-    Map(props.toSeq: _*)
-
-  /** convert a JavaRDD[(K,V)] to JavaPairRDD[K,V] */
-  def toJavaPairRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] =
-    JavaPairRDD.fromJavaRDD(rdd)
-
-  /** convert a JavaDStream[(K,V)] to JavaPairDStream[K,V] */
-  def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] =
-    JavaPairDStream.fromJavaDStream(ds)
-
-  /** an empty Map[String, String] for default opConf **/
-  val emptyStrStrMap: Map[String, String] = Map.empty
-}


Mime
View raw message