geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [04/50] [abbrv] geode git commit: GEODE-194: Remove spark connector
Date Tue, 13 Jun 2017 00:29:39 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeRunner.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeRunner.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeRunner.scala
deleted file mode 100644
index f2f5d06..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeRunner.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark.connector.testkit
-
-import java.io.{IOException, File}
-import java.net.InetAddress
-import java.util.Properties
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.io.FileUtils
-import org.apache.commons.io.filefilter.IOFileFilter
-
-/**
-* A class that manages Geode locator and servers.  Uses gfsh to
-* start and stop the locator and servers.
-*/
-class GeodeRunner(settings: Properties) {
-  val gfshCmd = new File(getCurrentDirectory, "../../geode-assembly/build/install/apache-geode/bin/gfsh").toString
-  val cacheXMLFile = settings.get("cache-xml-file")
-  val numServers: Int = settings.get("num-of-servers").asInstanceOf[String].toInt
-  val cwd = new File(".").getAbsolutePath
-  val geodeFunctionsTargetDir = new File("../geode-functions/target")
-  val testroot = "target/testgeode"
-  val classpath = new File(cwd, "target/scala-2.10/it-classes/")
-  val locatorPort = startGeodeCluster(numServers)
-
-  def getLocatorPort: Int = locatorPort
-
-  private def getCurrentDirectory = new File( "." ).getCanonicalPath
-  
-  private def startGeodeCluster(numServers: Int): Int = {
-    //ports(0) for Geode locator, the other ports are for Geode servers
-    val ports: Seq[Int] = IOUtils.getRandomAvailableTCPPorts(2 + numServers)
-    startGeodeLocator(ports(0), ports(1))
-    startGeodeServers(ports(0), ports.drop(2))
-    registerFunctions(ports(1))
-    ports(0)
-  }
-
-  private def startGeodeLocator(locatorPort: Int, jmxHttpPort:Int) {
-    println(s"=== GeodeRunner: starting locator on port $locatorPort")
-    val locatorDir = new File(cwd, s"$testroot/locator")
-    if (locatorDir.exists())
-      FileUtils.deleteDirectory(locatorDir)
-    IOUtils.mkdir(locatorDir)
-    new ProcessBuilder()
-      .command(gfshCmd, "start", "locator",
-        "--name=locator",
-        s"--dir=$locatorDir",
-        s"--port=$locatorPort",
-        s"--J=-Dgemfire.jmx-manager-http-port=$jmxHttpPort")
-      .inheritIO()
-      .start()
-
-    // Wait 30 seconds for locator to start
-    println(s"=== GeodeRunner: waiting for locator on port $locatorPort")
-    if (!IOUtils.waitForPortOpen(InetAddress.getByName("localhost"), locatorPort, 30000))
-      throw new IOException("Failed to start Geode locator.")
-    println(s"=== GeodeRunner: done waiting for locator on port $locatorPort")
-  }
-
-  private def startGeodeServers(locatorPort: Int, serverPorts: Seq[Int]) {
-    val procs = for (i <- 0 until serverPorts.length) yield {
-      println(s"=== GeodeRunner: starting server${i+1} with clientPort ${serverPorts(i)}")
-      val serverDir = new File(cwd, s"$testroot/server${i+1}")
-      if (serverDir.exists())
-        FileUtils.deleteDirectory(serverDir)
-      IOUtils.mkdir(serverDir)
-      new ProcessBuilder()
-        .command(gfshCmd, "start", "server",
-          s"--name=server${i+1}",
-          s"--locators=localhost[$locatorPort]",
-          s"--bind-address=localhost",
-          s"--server-port=${serverPorts(i)}",
-          s"--dir=$serverDir",
-          s"--cache-xml-file=$cacheXMLFile",
-          s"--classpath=$classpath")
-        .inheritIO()
-        .start()
-    }
-    procs.foreach(p => p.waitFor)
-    println(s"All $serverPorts.length servers have been started") 
-  }
-  
-  private def registerFunctions(jmxHttpPort:Int) {
-    import scala.collection.JavaConversions._
-    FileUtils.listFiles(geodeFunctionsTargetDir, fileFilter, dirFilter).foreach{  f => registerFunction(jmxHttpPort, f)}
-  }
-  
-  def fileFilter = new IOFileFilter {
-    def accept (file: File) = file.getName.endsWith(".jar") && file.getName.startsWith("geode-functions")
-    def accept (dir: File, name: String) = name.endsWith(".jar") && name.startsWith("geode-functions")
-  }
-  
-  def dirFilter = new IOFileFilter {
-    def accept (file: File) = file.getName.startsWith("scala")
-    def accept (dir: File, name: String) = name.startsWith("scala")
-  }
-  
-  private def registerFunction(jmxHttpPort:Int, jar:File) {
-    println("Deploying:" + jar.getName)
-    import org.apache.geode.spark.connector.GeodeFunctionDeployer
-    val deployer = new GeodeFunctionDeployer(new HttpClient())
-    deployer.deploy("localhost", jmxHttpPort, jar)
-  }
-
-  def stopGeodeCluster(): Unit = {
-    stopGeodeServers(numServers)
-    stopGeodeLocator()
-    if (!IOUtils.waitForPortClose(InetAddress.getByName("localhost"), getLocatorPort, 30000))
-      throw new IOException(s"Failed to stop Geode locator at port $getLocatorPort.")
-    println(s"Successfully stop Geode locator at port $getLocatorPort.")
-  }
-
-  private def stopGeodeLocator() {
-    println(s"=== GeodeRunner: stop locator")
-    val p = new ProcessBuilder()
-      .inheritIO()
-      .command(gfshCmd, "stop", "locator", s"--dir=$testroot/locator")
-      .start()
-     p.waitFor()
-  }
-
-  private def stopGeodeServers(numServers: Int) {
-   val procs = for (i <-1 to numServers) yield {
-       println(s"=== GeodeRunner: stop server $i.")
-       new ProcessBuilder()
-        .inheritIO()
-        .command(gfshCmd, "stop", "server", s"--dir=$testroot/server$i")
-        .start()
-   }
-   procs.foreach(p => p.waitFor())
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/IOUtils.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/IOUtils.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/IOUtils.scala
deleted file mode 100644
index 2ac4257..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/IOUtils.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark.connector.testkit
-
-import java.io.{File, IOException}
-import java.net.{InetAddress, Socket}
-import org.apache.geode.internal.AvailablePort
-import scala.util.Try
-import org.apache.log4j.PropertyConfigurator
-import java.util.Properties
-
-object IOUtils {
-
-  /** Makes a new directory or throws an `IOException` if it cannot be made */
-  def mkdir(dir: File): File = {
-    if (!dir.mkdirs())
-      throw new IOException(s"Could not create dir $dir")
-    dir
-  }
-
-  private def socketPortProb(host: InetAddress, port: Int) = Iterator.continually {
-    Try {
-      Thread.sleep(100)
-      new Socket(host, port).close()
-    }
-  }
-  
-  /**
-   * Waits until a port at the given address is open or timeout passes.
-   * @return true if managed to connect to the port, false if timeout happened first
-   */
-  def waitForPortOpen(host: InetAddress, port: Int, timeout: Long): Boolean = {
-    val startTime = System.currentTimeMillis()
-    socketPortProb(host, port)
-      .dropWhile(p => p.isFailure && System.currentTimeMillis() - startTime < timeout)
-      .next()
-      .isSuccess
-  }
-
-  /**
-   * Waits until a port at the given address is close or timeout passes.
-   * @return true if host:port is un-connect-able, false if timeout happened first
-   */
-  def waitForPortClose(host: InetAddress, port: Int, timeout: Long): Boolean = {
-    val startTime = System.currentTimeMillis()
-    socketPortProb(host, port)
-      .dropWhile(p => p.isSuccess && System.currentTimeMillis() - startTime < timeout)
-      .next()
-      .isFailure
-  }
-
-  /**
-   * Returns array of unique randomly available tcp ports of specified count.
-   */
-  def getRandomAvailableTCPPorts(count: Int): Seq[Int] =
-    (0 until count).map(x => AvailablePort.getRandomAvailablePortKeeper(AvailablePort.SOCKET))
-      .map{x => x.release(); x.getPort}.toArray
-
-  /**
-   * config a log4j properties used for integration tests
-   */
-  def configTestLog4j(level: String, props: (String, String)*): Unit = {
-    val pro = new Properties()
-    props.foreach(p => pro.put(p._1, p._2))
-    configTestLog4j(level, pro)
-  }
-
-  def configTestLog4j(level: String, props: Properties): Unit = {
-    val pro = new Properties()
-    pro.put("log4j.rootLogger", s"$level, console")
-    pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
-    pro.put("log4j.appender.console.target", "System.err")
-    pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
-    pro.put("log4j.appender.console.layout.ConversionPattern",
-      "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
-    pro.putAll(props)
-    PropertyConfigurator.configure(pro)
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
deleted file mode 100644
index 67f9e57..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming
-
-import org.apache.spark.util.ManualClock
-
-object ManualClockHelper {
-
-  def addToTime(ssc: StreamingContext, timeToAdd: Long): Unit = {
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    clock.advance(timeToAdd)
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
deleted file mode 100644
index fce1e67..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.reflect.ClassTag
-
-class TestInputDStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
-  extends InputDStream[T](ssc_) {
-
-  def start() {}
-
-  def stop() {}
-
-  def compute(validTime: Time): Option[RDD[T]] = {
-    logInfo("Computing RDD for time " + validTime)
-    val index = ((validTime - zeroTime) / slideDuration - 1).toInt
-    val selectedInput = if (index < input.size) input(index) else Seq[T]()
-
-    // lets us test cases where RDDs are not created
-    if (selectedInput == null)
-      return None
-
-    val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
-    logInfo("Created RDD " + rdd.id + " with " + selectedInput)
-    Some(rdd)
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
deleted file mode 100644
index fcf91c2..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaDStreamFunctions.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector.javaapi;
-
-import org.apache.geode.spark.connector.GeodeConnectionConf;
-import org.apache.geode.spark.connector.streaming.GeodeDStreamFunctions;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import java.util.Properties;
-
-import static org.apache.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream}
- * to provide Geode Spark Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * org.apache.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */ 
-public class GeodeJavaDStreamFunctions<T> {
-  
-  public final GeodeDStreamFunctions<T> dsf;
-
-  public GeodeJavaDStreamFunctions(JavaDStream<T> ds) {
-    this.dsf = new GeodeDStreamFunctions<T>(ds.dstream());
-  }
-
-  /**
-   * Save the JavaDStream to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored  
-   * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param opConf the optional parameters for this operation
-   */
-  public <K, V> void saveToGeode(
-    String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) {
-    dsf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
-  }
-
-  /**
-   * Save the JavaDStream to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored  
-   * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
-   * @param opConf the optional  parameters for this operation
-   */
-  public <K, V> void saveToGeode(
-          String regionPath, PairFunction<T, K, V> func, Properties opConf) {
-    dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
-  }
-
-  /**
-   * Save the JavaDStream to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored
-   * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   */
-  public <K, V> void saveToGeode(
-          String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) {
-    dsf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
-  }
-
-  /**
-   * Save the JavaDStream to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored
-   * @param func the PairFunction that converts elements of JavaDStream to key/value pairs
-   */
-  public <K, V> void saveToGeode(
-          String regionPath, PairFunction<T, K, V> func) {
-    dsf.saveToGeode(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
deleted file mode 100644
index 479f3e8..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairDStreamFunctions.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector.javaapi;
-
-import org.apache.geode.spark.connector.GeodeConnectionConf;
-import org.apache.geode.spark.connector.streaming.GeodePairDStreamFunctions;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import java.util.Properties;
-
-import static org.apache.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream}
- * to provide Geode Spark Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * org.apache.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaPairDStreamFunctions<K, V> {
-  
-  public final GeodePairDStreamFunctions<K, V> dsf;
-
-  public GeodeJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) {    
-    this.dsf = new GeodePairDStreamFunctions<K, V>(ds.dstream());
-  }
-
-  /**
-   * Save the JavaPairDStream to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored  
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param opConf the optional parameters for this operation
-   */  
-  public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) {
-    dsf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
-  }
-
-  /**
-   * Save the JavaPairDStream to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored  
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   */
-  public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
-    dsf.saveToGeode(regionPath, connConf, emptyStrStrMap());
-  }
-
-  /**
-   * Save the JavaPairDStream to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored
-   * @param opConf the optional parameters for this operation
-   */
-  public void saveToGeode(String regionPath, Properties opConf) {
-    dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf));
-  }
-
-  /**
-   * Save the JavaPairDStream to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored
-   */
-  public void saveToGeode(String regionPath) {
-    dsf.saveToGeode(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
deleted file mode 100644
index 52d6eec..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector.javaapi;
-
-import org.apache.geode.spark.connector.GeodeConnectionConf;
-import org.apache.geode.spark.connector.GeodePairRDDFunctions;
-import org.apache.geode.spark.connector.internal.rdd.GeodeJoinRDD;
-import org.apache.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import scala.Option;
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-
-import java.util.Properties;
-
-import static org.apache.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide Geode Spark
- * Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * org.apache.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaPairRDDFunctions<K, V> {
-
-  public final GeodePairRDDFunctions<K, V> rddf;
-
-  public GeodeJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) {
-    this.rddf = new GeodePairRDDFunctions<K, V>(rdd.rdd());
-  }
-
-  /**
-   * Save the pair RDD to Geode key-value store.
-   * @param regionPath the full path of region that the RDD is stored
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param opConf the parameters for this operation
-   */
-  public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) {
-    rddf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
-  }
-
-  /**
-   * Save the pair RDD to Geode key-value store.
-   * @param regionPath the full path of region that the RDD is stored
-   * @param opConf the parameters for this operation
-   */
-  public void saveToGeode(String regionPath, Properties opConf) {
-    rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
-  }
-
-  /**
-   * Save the pair RDD to Geode key-value store.
-   * @param regionPath the full path of region that the RDD is stored
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   */
-  public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
-    rddf.saveToGeode(regionPath, connConf, emptyStrStrMap());
-  }
-
-  /**
-   * Save the pair RDD to Geode key-value store with the default GeodeConnector.
-   * @param regionPath the full path of region that the RDD is stored
-   */
-  public void saveToGeode(String regionPath) {
-    rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap());
-  }
-
-  /**
-   * Return an JavaPairRDD containing all pairs of elements with matching keys in
-   * this RDD&lt;K, V> and the Geode `Region&lt;K, V2>`. Each pair of elements
-   * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
-   * (k, v2) is in the Geode region.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param <V2> the value type of the Geode region
-   * @return JavaPairRDD&lt;&lt;K, V>, V2>
-   */  
-  public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(String regionPath) {
-    return joinGeodeRegion(regionPath, rddf.defaultConnectionConf());
-  }
-
-  /**
-   * Return an JavaPairRDD containing all pairs of elements with matching keys in
-   * this RDD&lt;K, V> and the Geode `Region&lt;K, V2>`. Each pair of elements
-   * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
-   * (k, v2) is in the Geode region.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param <V2> the value type of the Geode region
-   * @return JavaPairRDD&lt;&lt;K, V>, V2>
-   */
-  public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
-    String regionPath, GeodeConnectionConf connConf) {
-    GeodeJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGeodeRegion(regionPath, connConf);
-    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
-    ClassTag<V2> vt = fakeClassTag();
-    return new JavaPairRDD<>(rdd, kt, vt);
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in this
-   * RDD&lt;K, V> and the Geode `Region&lt;K2, V2>`. The join key from RDD
-   * element is generated by `func(K, V) => K2`, and the key from the Geode
-   * region is just the key of the key/value pair.
-   *
-   * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
-   * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element (K, V)
-   * @param <K2> the key type of the Geode region
-   * @param <V2> the value type of the Geode region
-   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, V2>
-   */
-  public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
-    String regionPath, Function<Tuple2<K, V>, K2> func) {
-    return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in this
-   * RDD&lt;K, V> and the Geode `Region&lt;K2, V2>`. The join key from RDD 
-   * element is generated by `func(K, V) => K2`, and the key from the Geode 
-   * region is just the key of the key/value pair.
-   *
-   * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
-   * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element (K, V)
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param <K2> the key type of the Geode region
-   * @param <V2> the value type of the Geode region
-   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, V2>
-   */  
-  public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
-    String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) {
-    GeodeJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGeodeRegion(regionPath, func, connConf);
-    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
-    ClassTag<V2> vt = fakeClassTag();
-    return new JavaPairRDD<>(rdd, kt, vt);
-  }
-
-  /**
-   * Perform a left outer join of this RDD&lt;K, V> and the Geode `Region&lt;K, V2>`.
-   * For each element (k, v) in this RDD, the resulting RDD will either contain
-   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
-   * ((k, v), None)) if no element in the Geode region have key k.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param <V2> the value type of the Geode region
-   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
-   */
-  public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(String regionPath) {
-    return outerJoinGeodeRegion(regionPath, rddf.defaultConnectionConf());
-  }
-
-  /**
-   * Perform a left outer join of this RDD&lt;K, V> and the Geode `Region&lt;K, V2>`.
-   * For each element (k, v) in this RDD, the resulting RDD will either contain
-   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
-   * ((k, v), None)) if no element in the Geode region have key k.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param <V2> the value type of the Geode region
-   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
-   */  
-  public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
-    String regionPath, GeodeConnectionConf connConf) {
-    GeodeOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, connConf);
-    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
-    ClassTag<Option<V2>> vt = fakeClassTag();
-    return new JavaPairRDD<>(rdd, kt, vt);
-  }
-
-  /**
-   * Perform a left outer join of this RDD&lt;K, V> and the Geode `Region&lt;K2, V2>`.
-   * The join key from RDD element is generated by `func(K, V) => K2`, and the
-   * key from region is just the key of the key/value pair.
-   *
-   * For each element (k, v) in `this` RDD, the resulting RDD will either contain
-   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
-   * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element (K, V)
-   * @param <K2> the key type of the Geode region
-   * @param <V2> the value type of the Geode region
-   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
-   */
-  public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
-    String regionPath, Function<Tuple2<K, V>, K2> func) {
-    return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
-  }
-
-  /**
-   * Perform a left outer join of this RDD&lt;K, V> and the Geode `Region&lt;K2, V2>`.
-   * The join key from RDD element is generated by `func(K, V) => K2`, and the
-   * key from region is just the key of the key/value pair.
-   *
-   * For each element (k, v) in `this` RDD, the resulting RDD will either contain
-   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
-   * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element (K, V)
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param <K2> the key type of the Geode region
-   * @param <V2> the value type of the Geode region
-   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
-   */
-  public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
-    String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) {
-    GeodeOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf);
-    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
-    ClassTag<Option<V2>> vt = fakeClassTag();
-    return new JavaPairRDD<>(rdd, kt, vt);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
deleted file mode 100644
index be60ede..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector.javaapi;
-
-import org.apache.geode.spark.connector.GeodeConnectionConf;
-import org.apache.geode.spark.connector.GeodeRDDFunctions;
-import org.apache.geode.spark.connector.internal.rdd.GeodeJoinRDD;
-import org.apache.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Option;
-import scala.reflect.ClassTag;
-
-import java.util.Properties;
-
-import static org.apache.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-/**
- * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide Geode Spark
- * Connector functionality.
- *
- * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * org.apache.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaRDDFunctions<T> {
-
-  public final GeodeRDDFunctions<T> rddf;
-
-  public GeodeJavaRDDFunctions(JavaRDD<T> rdd) {
-    this.rddf = new GeodeRDDFunctions<T>(rdd.rdd());
-  }
-
-  /**
-   * Save the non-pair RDD to Geode key-value store.
-   * @param regionPath the full path of region that the RDD is stored  
-   * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param opConf the parameters for this operation
-   */  
-  public <K, V> void saveToGeode(
-    String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) {
-    rddf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
-  }
-
-  /**
-   * Save the non-pair RDD to Geode key-value store.
-   * @param regionPath the full path of region that the RDD is stored  
-   * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   */
-  public <K, V> void saveToGeode(
-    String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) {
-    rddf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
-  }
-
-  /**
-   * Save the non-pair RDD to Geode key-value store.
-   * @param regionPath the full path of region that the RDD is stored
-   * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
-   * @param opConf the parameters for this operation
-   */
-  public <K, V> void saveToGeode(
-    String regionPath, PairFunction<T, K, V> func, Properties opConf) {
-    rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf));
-  }
-
-  /**
-   * Save the non-pair RDD to Geode key-value store with default GeodeConnector.
-   * @param regionPath the full path of region that the RDD is stored  
-   * @param func the PairFunction that converts elements of JavaRDD to key/value pairs
-   */
-  public <K, V> void saveToGeode(String regionPath, PairFunction<T, K, V> func) {
-    rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap());
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in this
-   * RDD&lt;T> and the Geode `Region&lt;K, V>`. The join key from RDD
-   * element is generated by `func(T) => K`, and the key from the Geode
-   * region is just the key of the key/value pair.
-   *
-   * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
-   * where t is from this RDD and v is from the Geode region.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element T
-   * @param <K> the key type of the Geode region
-   * @param <V> the value type of the Geode region
-   * @return JavaPairRDD&lt;T, V>
-   */
-  public <K, V> JavaPairRDD<T, V> joinGeodeRegion(String regionPath, Function<T, K> func) {
-    return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in this
-   * RDD&lt;T> and the Geode `Region&lt;K, V>`. The join key from RDD
-   * element is generated by `func(T) => K`, and the key from the Geode
-   * region is just the key of the key/value pair.
-   *
-   * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
-   * where t is from this RDD and v is from the Geode region.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element T
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param <K> the key type of the Geode region
-   * @param <V> the value type of the Geode region
-   * @return JavaPairRDD&lt;T, V>
-   */
-  public <K, V> JavaPairRDD<T, V> joinGeodeRegion(
-    String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
-    GeodeJoinRDD<T, K, V> rdd = rddf.joinGeodeRegion(regionPath, func, connConf);
-    ClassTag<T> kt = fakeClassTag();
-    ClassTag<V> vt = fakeClassTag();
-    return new JavaPairRDD<>(rdd, kt, vt);
-  }
-
-  /**
-   * Perform a left outer join of this RDD&lt;T> and the Geode `Region&lt;K, V>`.
-   * The join key from RDD element is generated by `func(T) => K`, and the
-   * key from region is just the key of the key/value pair.
-   *
-   * For each element (t) in this RDD, the resulting RDD will either contain
-   * all pairs (t, Some(v)) for v in the Geode region, or the pair
-   * (t, None) if no element in the Geode region have key `func(t)`.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element T
-   * @param <K> the key type of the Geode region
-   * @param <V> the value type of the Geode region
-   * @return JavaPairRDD&lt;T, Option&lt;V>>
-   */
-  public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(String regionPath, Function<T, K> func) {
-    return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
-  }
-
-  /**
-   * Perform a left outer join of this RDD&lt;T> and the Geode `Region&lt;K, V>`.
-   * The join key from RDD element is generated by `func(T) => K`, and the
-   * key from region is just the key of the key/value pair.
-   *
-   * For each element (t) in this RDD, the resulting RDD will either contain
-   * all pairs (t, Some(v)) for v in the Geode region, or the pair
-   * (t, None) if no element in the Geode region have key `func(t)`.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element T
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param <K> the key type of the Geode region
-   * @param <V> the value type of the Geode region
-   * @return JavaPairRDD&lt;T, Option&lt;V>>
-   */
-  public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(
-    String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
-    GeodeOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf);
-    ClassTag<T> kt = fakeClassTag();
-    ClassTag<Option<V>> vt = fakeClassTag();
-    return new JavaPairRDD<>(rdd, kt, vt);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
deleted file mode 100644
index 5e1e354..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector.javaapi;
-
-import org.apache.geode.spark.connector.GeodeConnectionConf;
-import org.apache.geode.spark.connector.GeodeSQLContextFunctions;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-
-/**
- * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide Geode
- * OQL functionality.
- *
- * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * org.apache.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaSQLContextFunctions {
-
-  public final GeodeSQLContextFunctions scf;
-
-  public GeodeJavaSQLContextFunctions(SQLContext sqlContext) {
-    scf = new GeodeSQLContextFunctions(sqlContext);
-  }
-
-  public <T> DataFrame geodeOQL(String query) {
-    DataFrame df = scf.geodeOQL(query, scf.defaultConnectionConf());
-    return df;
-  }
-
-  public <T> DataFrame geodeOQL(String query, GeodeConnectionConf connConf) {
-    DataFrame df = scf.geodeOQL(query, connConf);
-    return df;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
deleted file mode 100644
index a257617..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector.javaapi;
-
-
-import org.apache.geode.spark.connector.GeodeConnectionConf;
-import org.apache.geode.spark.connector.internal.rdd.GeodeRegionRDD;
-import org.apache.geode.spark.connector.internal.rdd.GeodeRegionRDD$;
-import org.apache.spark.SparkContext;
-import static org.apache.geode.spark.connector.javaapi.JavaAPIHelper.*;
-
-import scala.reflect.ClassTag;
-import java.util.Properties;
-
-/**
- * Java API wrapper over {@link org.apache.spark.SparkContext} to provide Geode
- * Connector functionality.
- *
- * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link
- * org.apache.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
- */
-public class GeodeJavaSparkContextFunctions {
-
-  public final SparkContext sc;
-
-  public GeodeJavaSparkContextFunctions(SparkContext sc) {
-    this.sc = sc;
-  }
-
-  /**
-   * Expose a Geode region as a JavaPairRDD
-   * @param regionPath the full path of the region
-   * @param connConf the GeodeConnectionConf that can be used to access the region
-   * @param opConf the parameters for this operation, such as preferred partitioner.
-   */
-  public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(
-    String regionPath, GeodeConnectionConf connConf, Properties opConf) {
-    ClassTag<K> kt = fakeClassTag();
-    ClassTag<V> vt = fakeClassTag();    
-    GeodeRegionRDD<K, V>  rdd =  GeodeRegionRDD$.MODULE$.apply(
-      sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt);
-    return new GeodeJavaRegionRDD<>(rdd);
-  }
-
-  /**
-   * Expose a Geode region as a JavaPairRDD with default GeodeConnector and no preferred partitioner.
-   * @param regionPath the full path of the region
-   */
-  public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath) {
-    GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
-    return geodeRegion(regionPath, connConf, new Properties());
-  }
-
-  /**
-   * Expose a Geode region as a JavaPairRDD with no preferred partitioner.
-   * @param regionPath the full path of the region
-   * @param connConf the GeodeConnectionConf that can be used to access the region
-   */
-  public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, GeodeConnectionConf connConf) {
-    return geodeRegion(regionPath, connConf, new Properties());
-  }
-
-  /**
-   * Expose a Geode region as a JavaPairRDD with default GeodeConnector.
-   * @param regionPath the full path of the region
-   * @param opConf the parameters for this operation, such as preferred partitioner.
-   */
-  public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, Properties opConf) {
-    GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
-    return geodeRegion(regionPath, connConf, opConf);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java b/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java
deleted file mode 100644
index 8f797ec..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/java/org/apache/geode/spark/connector/javaapi/GeodeJavaUtil.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector.javaapi;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import scala.Tuple2;
-
-import org.apache.geode.spark.connector.package$;
-
-/**
- * The main entry point to Spark Geode Connector Java API.
- *
- * There are several helpful static factory methods which build useful wrappers
- * around Spark Context, Streaming Context and RDD. There are also helper methods
- * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>.
- */
-public final class GeodeJavaUtil {
-  
-  /** constants */
-  public static String GeodeLocatorPropKey = package$.MODULE$.GeodeLocatorPropKey();
-  // partitioner related keys and values
-  public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey();
-  public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey();
-  public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName();
-  public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName();
-  public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey();
-  public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault();
-  
-  /** The private constructor is used prevents user from creating instance of this class. */
-  private GeodeJavaUtil() { }
-
-  /**
-   * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based
-   * on an existing {@link SparkContext} instance.
-   */
-  public static GeodeJavaSparkContextFunctions javaFunctions(SparkContext sc) {
-    return new GeodeJavaSparkContextFunctions(sc);
-  }
-
-  /**
-   * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based
-   * on an existing {@link JavaSparkContext} instance.
-   */
-  public static GeodeJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) {
-    return new GeodeJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc));
-  }
-
-  /**
-   * A static factory method to create a {@link GeodeJavaPairRDDFunctions} based on an
-   * existing {@link org.apache.spark.api.java.JavaPairRDD} instance.
-   */
-  public static <K, V> GeodeJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) {
-    return new GeodeJavaPairRDDFunctions<K, V>(rdd);
-  }
-
-  /**
-   * A static factory method to create a {@link GeodeJavaRDDFunctions} based on an
-   * existing {@link org.apache.spark.api.java.JavaRDD} instance.
-   */
-  public static <T> GeodeJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) {
-    return new GeodeJavaRDDFunctions<T>(rdd);
-  }
-
-  /**
-   * A static factory method to create a {@link GeodeJavaPairDStreamFunctions} based on an
-   * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance.
-   */
-  public static <K, V> GeodeJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) {
-    return new GeodeJavaPairDStreamFunctions<>(ds);
-  }
-
-  /**
-   * A static factory method to create a {@link GeodeJavaDStreamFunctions} based on an
-   * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance.
-   */
-  public static <T> GeodeJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) {
-    return new GeodeJavaDStreamFunctions<>(ds);
-  }
-
-  /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}&lt;&lt;Tuple2&lt;K, V&gt;&gt;
-   * to a {@link org.apache.spark.api.java.JavaPairRDD}&lt;K, V&gt;.
-   */
-  public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) {
-    return JavaAPIHelper.toJavaPairRDD(rdd);
-  }
-
-  /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}&lt;&lt;Tuple2&lt;K, V&gt;&gt;
-   * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}&lt;K, V&gt;.
-   */
-  public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) {
-    return JavaAPIHelper.toJavaPairDStream(ds);
-  }
-
-  /**
-   * A static factory method to create a {@link GeodeJavaSQLContextFunctions} based
-   * on an existing {@link SQLContext} instance.
-   */
-  public static GeodeJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) {
-    return new GeodeJavaSQLContextFunctions(sqlContext);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala
deleted file mode 100644
index 4343b90..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector
-
-import org.apache.geode.cache.execute.ResultCollector
-import org.apache.geode.cache.query.Query
-import org.apache.geode.cache.Region
-import org.apache.geode.spark.connector.internal.RegionMetadata
-import org.apache.geode.spark.connector.internal.rdd.GeodeRDDPartition
-
-
-trait GeodeConnection {
-
-  /**
-   * Validate region existence and key/value type constraints, throw RuntimeException
-   * if region does not exist or key and/or value type do(es) not match.
-   * @param regionPath the full path of region
-   */
-  def validateRegion[K, V](regionPath: String): Unit
-
-  /**
-   * Get Region proxy for the given region
-   * @param regionPath the full path of region
-   */
-  def getRegionProxy[K, V](regionPath: String): Region[K, V]
-
-  /**
-   * Retrieve region meta data for the given region. 
-   * @param regionPath: the full path of the region
-   * @return Some[RegionMetadata] if region exists, None otherwise
-   */
-  def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata]
-
-  /** 
-   * Retrieve region data for the given region and bucket set 
-   * @param regionPath: the full path of the region
-   * @param whereClause: the set of bucket IDs
-   * @param split: Geode RDD Partition instance
-   */
-  def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)]
-
-  def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object
-  /** 
-   * Create a geode OQL query
-   * @param queryString Geode OQL query string
-   */
-  def getQuery(queryString: String): Query
-
-  /** Close the connection */
-  def close(): Unit
-}
-
-

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala
deleted file mode 100644
index cf0d7b6..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector
-
-import org.apache.spark.SparkConf
-import org.apache.geode.spark.connector.internal.{DefaultGeodeConnectionManager, LocatorHelper}
-
-/**
- * Stores configuration of a connection to Geode cluster. It is serializable and can
- * be safely sent over network.
- *
- * @param locators Geode locator host:port pairs, the default is (localhost,10334)
- * @param geodeProps The initial geode properties to be used.
- * @param connectionManager GeodeConnectionFactory instance
- */
-class GeodeConnectionConf(
-   val locators: Seq[(String, Int)], 
-   val geodeProps: Map[String, String] = Map.empty,
-   connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager
-  ) extends Serializable {
-
-  /** require at least 1 pair of (host,port) */
-  require(locators.nonEmpty)
-  
-  def getConnection: GeodeConnection = connectionManager.getConnection(this)
-  
-}
-
-object GeodeConnectionConf {
-
-  /**
-   * create GeodeConnectionConf object based on locator string and optional GeodeConnectionFactory
-   * @param locatorStr Geode cluster locator string
-   * @param connectionManager GeodeConnection factory
-   */
-  def apply(locatorStr: String, geodeProps: Map[String, String] = Map.empty)
-    (implicit connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager): GeodeConnectionConf = {
-    new GeodeConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), geodeProps, connectionManager)
-  }
-
-  /**
-   * create GeodeConnectionConf object based on SparkConf. Note that implicit can
-   * be used to control what GeodeConnectionFactory instance to use if desired
-   * @param conf a SparkConf instance 
-   */
-  def apply(conf: SparkConf): GeodeConnectionConf = {
-    val locatorStr = conf.getOption(GeodeLocatorPropKey).getOrElse(
-      throw new RuntimeException(s"SparkConf does not contain property $GeodeLocatorPropKey"))
-    // SparkConf only holds properties whose key starts with "spark.", In order to
-    // put geode properties in SparkConf, all geode properties are prefixes with
-    // "spark.geode.". This prefix was removed before the properties were put in `geodeProp`
-    val prefix = "spark.geode."
-    val geodeProps = conf.getAll.filter {
-        case (k, v) => k.startsWith(prefix) && k != GeodeLocatorPropKey
-      }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap
-    apply(locatorStr, geodeProps)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala
deleted file mode 100644
index b0dc3ee..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector
-
-/**
- * GeodeConnectionFactory provide an common interface that manages Geode
- * connections, and it's serializable. Each factory instance will handle
- * connection instance creation and connection pool management.
- */
-trait GeodeConnectionManager extends Serializable {
-
-  /** get connection for the given connector */
-  def getConnection(connConf: GeodeConnectionConf): GeodeConnection
-
-  /** close the connection */
-  def closeConnection(connConf: GeodeConnectionConf): Unit
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala
deleted file mode 100644
index 0229306..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector
-
-import java.io.File
-import java.net.URL
-import org.apache.commons.httpclient.methods.PostMethod
-import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity}
-import org.apache.commons.httpclient.HttpClient
-import org.apache.spark.Logging
-
-object GeodeFunctionDeployer {
-  def main(args: Array[String]) {
-    new GeodeFunctionDeployer(new HttpClient()).commandLineRun(args)
-  }
-}
-
-class GeodeFunctionDeployer(val httpClient:HttpClient) extends Logging {
-
-  def deploy(host: String, port: Int, jarLocation: String): String =
-    deploy(host + ":" + port, jarLocation)
-  
-  def deploy(host: String, port: Int, jar:File): String =
-    deploy(host + ":" + port, jar)
-  
-  def deploy(jmxHostAndPort: String, jarLocation: String): String =
-    deploy(jmxHostAndPort, jarFileHandle(jarLocation))
-  
-  def deploy(jmxHostAndPort: String, jar: File): String = {
-    val urlString = constructURLString(jmxHostAndPort)
-    val filePost: PostMethod = new PostMethod(urlString)
-    val parts: Array[Part] = new Array[Part](1)
-    parts(0) = new FilePart("resources", jar)
-    filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams))
-    val status: Int = httpClient.executeMethod(filePost)
-    "Deployed Jar with status:" + status
-  }
-
-  private[connector] def constructURLString(jmxHostAndPort: String) =
-    "http://" + jmxHostAndPort + "/gemfire/v1/deployed"
-
-  private[connector]def jarFileHandle(jarLocation: String) = {
-    val f: File = new File(jarLocation)
-    if (!f.exists()) {
-      val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath
-      logInfo(errorMessage)
-      throw new RuntimeException(errorMessage)
-    }
-    f
-  }
-  
-  def commandLineRun(args: Array[String]):Unit = {
-    val (hostPort: String, jarFile: String) =
-    if (args.length < 2) {
-      logInfo("JMX Manager Host and Port (example: localhost:7070):")
-      val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in))
-      val jmxHostAndPort = bufferedReader.readLine()
-      logInfo("Location of geode-functions.jar:")
-      val functionJarLocation = bufferedReader.readLine()
-      (jmxHostAndPort, functionJarLocation)
-    } else {
-      (args(0), args(1))
-    }
-    val status = deploy(hostPort, jarFile)
-    logInfo(status)
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala
deleted file mode 100644
index e09e67c..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector
-
-import com.esotericsoftware.kryo.Kryo
-import org.apache.geode.spark.connector.internal.oql.UndefinedSerializer
-import org.apache.spark.serializer.KryoRegistrator
-import org.apache.geode.cache.query.internal.Undefined
-
-class GeodeKryoRegistrator extends KryoRegistrator{
-
-  override def registerClasses(kyro: Kryo): Unit = {
-    kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer])
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala
deleted file mode 100644
index d0b6684..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector
-
-import org.apache.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodePairRDDWriter}
-import org.apache.spark.Logging
-import org.apache.spark.api.java.function.Function
-import org.apache.spark.rdd.RDD
-
-/**
- * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion.
- * Import `org.apache.geode.spark.connector._` at the top of your program to
- * use these functions.
- */
-class GeodePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging {
-
-  /**
-   * Save the RDD of pairs to Geode key-value store without any conversion
-   * @param regionPath the full path of region that the RDD is stored
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param opConf the optional parameters for this operation
-   */
-  def saveToGeode(
-      regionPath: String, 
-      connConf: GeodeConnectionConf = defaultConnectionConf, 
-      opConf: Map[String, String] = Map.empty): Unit = {    
-    connConf.getConnection.validateRegion[K, V](regionPath)
-    if (log.isDebugEnabled)
-      logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n  ${getRddPartitionsInfo(rdd)}""")
-    else
-      logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
-    val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf)
-    rdd.sparkContext.runJob(rdd, writer.write _)
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this`
-   * RDD and the Geode `Region[K, V2]`. Each pair of elements will be returned
-   * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the
-   * Geode region.
-   *
-   *@param regionPath the region path of the Geode region
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @tparam K2 the key type of the Geode region
-   * @tparam V2 the value type of the Geode region
-   * @return RDD[T, V]
-   */
-  def joinGeodeRegion[K2 <: K, V2](
-    regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K, V2] = {
-    new GeodeJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` RDD
-   * and the Geode `Region[K2, V2]`. The join key from RDD element is generated by
-   * `func(K, V) => K2`, and the key from the Geode region is jus the key of the
-   * key/value pair.
-   *
-   * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
-   * where (k, v) is in `this` RDD and (k2, v2) is in the Geode region.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element (K, V)
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @tparam K2 the key type of the Geode region
-   * @tparam V2 the value type of the Geode region
-   * @return RDD[(K, V), V2]
-   */
-  def joinGeodeRegion[K2, V2](
-    regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K2, V2] =
-    new GeodeJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
-
-  /** This version of joinGeodeRegion(...) is just for Java API. */
-  private[connector] def joinGeodeRegion[K2, V2](
-    regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = {
-    new GeodeJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
-  }
-
-  /**
-   * Perform a left outer join of `this` RDD and the Geode `Region[K, V2]`.
-   * For each element (k, v) in `this` RDD, the resulting RDD will either contain
-   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
-   * ((k, v), None)) if no element in the Geode region have key k.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @tparam K2 the key type of the Geode region
-   * @tparam V2 the value type of the Geode region
-   * @return RDD[ (K, V), Option[V] ]
-   */
-  def outerJoinGeodeRegion[K2 <: K, V2](
-    regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K, V2] = {
-    new GeodeOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
-  }
-
-  /**
-   * Perform a left outer join of `this` RDD and the Geode `Region[K2, V2]`.
-   * The join key from RDD element is generated by `func(K, V) => K2`, and the
-   * key from region is jus the key of the key/value pair.
-   *
-   * For each element (k, v) in `this` RDD, the resulting RDD will either contain
-   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
-   * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
-   *
-   *@param regionPath the region path of the Geode region
-   * @param func the function that generates region key from RDD element (K, V)
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @tparam K2 the key type of the Geode region
-   * @tparam V2 the value type of the Geode region
-   * @return RDD[ (K, V), Option[V] ]
-   */
-  def outerJoinGeodeRegion[K2, V2](
-    regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
-    new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
-  }
-
-  /** This version of outerJoinGeodeRegion(...) is just for Java API. */
-  private[connector] def outerJoinGeodeRegion[K2, V2](
-    regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
-    new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
-  }
-
-  private[connector] def defaultConnectionConf: GeodeConnectionConf =
-    GeodeConnectionConf(rdd.sparkContext.getConf)
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala
deleted file mode 100644
index 5649fff..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector
-
-import org.apache.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodeRDDWriter}
-import org.apache.spark.Logging
-import org.apache.spark.api.java.function.{PairFunction, Function}
-import org.apache.spark.rdd.RDD
-
-/**
- * Extra gemFire functions on non-Pair RDDs through an implicit conversion.
- * Import `org.apache.geode.spark.connector._` at the top of your program to 
- * use these functions.  
- */
-class GeodeRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging {
-
-  /**
-   * Save the non-pair RDD to Geode key-value store.
-   * @param regionPath the full path of region that the RDD is stored  
-   * @param func the function that converts elements of RDD to key/value pairs
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @param opConf the optional parameters for this operation
-   */
-  def saveToGeode[K, V](
-      regionPath: String, 
-      func: T => (K, V), 
-      connConf: GeodeConnectionConf = defaultConnectionConf,
-      opConf: Map[String, String] = Map.empty): Unit = {
-    connConf.getConnection.validateRegion[K, V](regionPath)
-    if (log.isDebugEnabled)
-      logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n  ${getRddPartitionsInfo(rdd)}""")
-    else
-      logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
-    val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf)
-    rdd.sparkContext.runJob(rdd, writer.write(func) _)
-  }
-
-  /** This version of saveToGeode(...) is just for Java API. */
-  private[connector] def saveToGeode[K, V](
-      regionPath: String, 
-      func: PairFunction[T, K, V], 
-      connConf: GeodeConnectionConf, 
-      opConf: Map[String, String]): Unit = {
-    saveToGeode[K, V](regionPath, func.call _, connConf, opConf)
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` RDD
-   * and the Geode `Region[K, V]`. The join key from RDD element is generated by
-   * `func(T) => K`, and the key from the Geode region is just the key of the
-   * key/value pair.
-   *
-   * Each pair of elements of result RDD will be returned as a (t, v) tuple, 
-   * where (t) is in `this` RDD and (k, v) is in the Geode region.
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generate region key from RDD element T
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @tparam K the key type of the Geode region
-   * @tparam V the value type of the Geode region
-   * @return RDD[T, V]
-   */
-  def joinGeodeRegion[K, V](regionPath: String, func: T => K, 
-    connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[T, K, V] = {
-    new GeodeJoinRDD[T, K, V](rdd, func, regionPath, connConf)    
-  }
-
-  /** This version of joinGeodeRegion(...) is just for Java API. */
-  private[connector] def joinGeodeRegion[K, V](
-    regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeJoinRDD[T, K, V] = {
-    joinGeodeRegion(regionPath, func.call _, connConf)
-  }
-
-  /**
-   * Perform a left outer join of `this` RDD and the Geode `Region[K, V]`.
-   * The join key from RDD element is generated by `func(T) => K`, and the
-   * key from region is just the key of the key/value pair.
-   *
-   * For each element (t) in `this` RDD, the resulting RDD will either contain
-   * all pairs (t, Some(v)) for v in the Geode region, or the pair
-   * (t, None) if no element in the Geode region have key `func(t)`
-   *
-   * @param regionPath the region path of the Geode region
-   * @param func the function that generate region key from RDD element T
-   * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster
-   * @tparam K the key type of the Geode region
-   * @tparam V the value type of the Geode region
-   * @return RDD[ T, Option[V] ]
-   */
-  def outerJoinGeodeRegion[K, V](regionPath: String, func: T => K,
-    connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[T, K, V] = {
-    new GeodeOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf)
-  }
-
-  /** This version of outerJoinGeodeRegion(...) is just for Java API. */
-  private[connector] def outerJoinGeodeRegion[K, V](
-    regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[T, K, V] = {
-    outerJoinGeodeRegion(regionPath, func.call _, connConf)
-  }
-
-  private[connector] def defaultConnectionConf: GeodeConnectionConf =
-    GeodeConnectionConf(rdd.sparkContext.getConf)
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala
deleted file mode 100644
index 433c066..0000000
--- a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.spark.connector
-
-import org.apache.geode.spark.connector.internal.oql.{OQLRelation, QueryRDD}
-import org.apache.spark.Logging
-import org.apache.spark.sql.{DataFrame, SQLContext}
-
-/**
- * Provide Geode OQL specific functions
- */
-class GeodeSQLContextFunctions(@transient sqlContext: SQLContext) extends Serializable with Logging {
-
-  /**
-   * Expose a Geode OQL query result as a DataFrame
-   * @param query the OQL query string.
-   */
-  def geodeOQL(
-    query: String,
-    connConf: GeodeConnectionConf = GeodeConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = {
-    logInfo(s"OQL query = $query")
-    val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf)
-    sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext))
-  }
-
-  private[connector] def defaultConnectionConf: GeodeConnectionConf =
-    GeodeConnectionConf(sqlContext.sparkContext.getConf)
-}


Mime
View raw message