geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [05/50] [abbrv] geode git commit: GEODE-194: Remove spark connector
Date Tue, 13 Jun 2017 00:29:40 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/JavaApiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/JavaApiIntegrationTest.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/JavaApiIntegrationTest.java
deleted file mode 100644
index 281236f..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/JavaApiIntegrationTest.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark.connector;
-
-import org.apache.geode.cache.Region;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.spark.connector.GeodeConnection;
-import org.apache.geode.spark.connector.GeodeConnectionConf;
-import org.apache.geode.spark.connector.GeodeConnectionConf$;
-import org.apache.geode.spark.connector.internal.DefaultGeodeConnectionManager$;
-import org.apache.geode.spark.connector.javaapi.GeodeJavaRegionRDD;
-import ittest.org.apache.geode.spark.connector.testkit.GeodeCluster$;
-import ittest.org.apache.geode.spark.connector.testkit.IOUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.scalatest.junit.JUnitSuite;
-import org.apache.geode.spark.connector.package$;
-import scala.Tuple2;
-import scala.Option;
-import scala.Some;
-import java.util.*;
-
-import static org.apache.geode.spark.connector.javaapi.GeodeJavaUtil.RDDSaveBatchSizePropKey;
-import static org.apache.geode.spark.connector.javaapi.GeodeJavaUtil.javaFunctions;
-import static org.junit.Assert.*;
-
-public class JavaApiIntegrationTest extends JUnitSuite {
-
-  static JavaSparkContext jsc = null;
-  static GeodeConnectionConf connConf = null;
-  
-  static int numServers = 2;
-  static int numObjects = 1000;
-  static String regionPath = "pr_str_int_region";
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    // start geode cluster, and spark context
-    Properties settings = new Properties();
-    settings.setProperty(ConfigurationProperties.CACHE_XML_FILE, "src/it/resources/test-retrieve-regions.xml");
-    settings.setProperty("num-of-servers", Integer.toString(numServers));
-    int locatorPort = GeodeCluster$.MODULE$.start(settings);
-
-    // start spark context in local mode
-    Properties props = new Properties();
-    props.put("log4j.logger.org.apache.spark", "INFO");
-    props.put("log4j.logger.org.apache.geode.spark.connector","DEBUG");
-    IOUtils.configTestLog4j("ERROR", props);
-    SparkConf conf = new SparkConf()
-            .setAppName("RetrieveRegionIntegrationTest")
-            .setMaster("local[2]")
-            .set(package$.MODULE$.GeodeLocatorPropKey(), "localhost:"+ locatorPort);
-    // sc = new SparkContext(conf);
-    jsc = new JavaSparkContext(conf);
-    connConf = GeodeConnectionConf.apply(jsc.getConf());
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    // stop connection, spark context, and geode cluster
-    DefaultGeodeConnectionManager$.MODULE$.closeConnection(GeodeConnectionConf$.MODULE$.apply(jsc.getConf()));
-    jsc.stop();
-    GeodeCluster$.MODULE$.stop();
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //   utility methods
-  // --------------------------------------------------------------------------------------------
-
-  private <K,V> void matchMapAndPairList(Map<K,V> map, List<Tuple2<K,V>> list) {
-    assertTrue("size mismatch \nmap: " + map.toString() + "\nlist: " + list.toString(), map.size() == list.size());
-    for (Tuple2<K, V> p : list) {
-      assertTrue("value mismatch: k=" + p._1() + " v1=" + p._2() + " v2=" + map.get(p._1()),
-                 p._2().equals(map.get(p._1())));
-    }
-  }
-
-  private Region<String, Integer> prepareStrIntRegion(String regionPath, int start, int stop) {
-    HashMap<String, Integer> entriesMap = new HashMap<>();
-    for (int i = start; i < stop; i ++) {
-      entriesMap.put("k_" + i, i);
-    }
-
-    GeodeConnection conn = connConf.getConnection();
-    Region<String, Integer> region = conn.getRegionProxy(regionPath);
-    region.removeAll(region.keySetOnServer());
-    region.putAll(entriesMap);
-    return region;
-  }
-
-  private JavaPairRDD<String, Integer> prepareStrIntJavaPairRDD(int start, int stop) {
-    List<Tuple2<String, Integer>> data = new ArrayList<>();
-    for (int i = start; i < stop; i ++) {
-      data.add(new Tuple2<>("k_" + i, i));
-    }
-    return jsc.parallelizePairs(data);
-  }
-
-  private JavaPairRDD<Integer, Integer> prepareIntIntJavaPairRDD(int start, int stop) {
-    List<Tuple2<Integer, Integer>> data = new ArrayList<>();
-    for (int i = start; i < stop; i ++) {
-      data.add(new Tuple2<>(i, i * 2));
-    }
-    return jsc.parallelizePairs(data);
-  }
-
-  private JavaRDD<Integer> prepareIntJavaRDD(int start, int stop) {
-    List<Integer> data = new ArrayList<>();
-    for (int i = start; i < stop; i ++) {
-      data.add(i);
-    }
-    return jsc.parallelize(data);
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //   JavaRDD.saveToGeode
-  // --------------------------------------------------------------------------------------------
-
-  static class IntToStrIntPairFunction implements PairFunction<Integer, String, Integer> {
-    @Override public Tuple2<String, Integer> call(Integer x) throws Exception {
-      return new Tuple2<>("k_" + x, x);
-    }
-  }
-
-  @Test
-  public void testRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws Exception {
-    verifyRDDSaveToGeode(true, true);
-  }
-
-  @Test
-  public void testRDDSaveToGeodeWithDefaultConnConf() throws Exception {
-    verifyRDDSaveToGeode(true, false);
-  }
-  
-  @Test
-  public void testRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
-    verifyRDDSaveToGeode(false, true);
-  }
-
-  @Test
-  public void testRDDSaveToGeodeWithConnConf() throws Exception {
-    verifyRDDSaveToGeode(false, false);
-  }
-  
-  public void verifyRDDSaveToGeode(boolean useDefaultConnConf, boolean useOpConf) throws Exception {
-    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  // remove all entries
-    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects);
-
-    PairFunction<Integer, String, Integer> func = new IntToStrIntPairFunction();
-    Properties opConf = new Properties();
-    opConf.put(RDDSaveBatchSizePropKey, "200");
-
-    if (useDefaultConnConf) {
-      if (useOpConf)
-        javaFunctions(rdd1).saveToGeode(regionPath, func, opConf);
-      else
-        javaFunctions(rdd1).saveToGeode(regionPath, func);
-    } else {
-      if (useOpConf)
-        javaFunctions(rdd1).saveToGeode(regionPath, func, connConf, opConf);
-      else
-        javaFunctions(rdd1).saveToGeode(regionPath, func, connConf);
-    }
-    
-    Set<String> keys = region.keySetOnServer();
-    Map<String, Integer> map = region.getAll(keys);
-
-    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
-
-    for (int i = 0; i < numObjects; i ++) {
-      expectedList.add(new Tuple2<>("k_" + i, i));
-    }
-    matchMapAndPairList(map, expectedList);
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //   JavaPairRDD.saveToGeode
-  // --------------------------------------------------------------------------------------------
-
-  @Test
-  public void testPairRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws Exception {
-    verifyPairRDDSaveToGeode(true, true);
-  }
-
-  @Test
-  public void testPairRDDSaveToGeodeWithDefaultConnConf() throws Exception {
-    verifyPairRDDSaveToGeode(true, false);
-  }
-  
-  @Test
-  public void testPairRDDSaveToGeodeWithConnConfAndOpConf() throws Exception {
-    verifyPairRDDSaveToGeode(false, true);
-  }
-
-  @Test
-  public void testPairRDDSaveToGeodeWithConnConf() throws Exception {
-    verifyPairRDDSaveToGeode(false, false);
-  }
-  
-  public void verifyPairRDDSaveToGeode(boolean useDefaultConnConf, boolean useOpConf) throws Exception {
-    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  // remove all entries
-    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, numObjects);
-    Properties opConf = new Properties();
-    opConf.put(RDDSaveBatchSizePropKey, "200");
-
-    if (useDefaultConnConf) {
-      if (useOpConf)
-        javaFunctions(rdd1).saveToGeode(regionPath, opConf);
-      else
-        javaFunctions(rdd1).saveToGeode(regionPath);
-    } else {
-      if (useOpConf)
-        javaFunctions(rdd1).saveToGeode(regionPath, connConf, opConf);
-      else
-        javaFunctions(rdd1).saveToGeode(regionPath, connConf);
-    }
-
-    Set<String> keys = region.keySetOnServer();
-    Map<String, Integer> map = region.getAll(keys);
-
-    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
-    for (int i = 0; i < numObjects; i ++) {
-      expectedList.add(new Tuple2<>("k_" + i, i));
-    }
-    matchMapAndPairList(map, expectedList);
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //   JavaSparkContext.geodeRegion and where clause
-  // --------------------------------------------------------------------------------------------
-
-  @Test
-  public void testJavaSparkContextGeodeRegion() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);  // remove all entries
-    Properties emptyProps = new Properties();
-    GeodeJavaRegionRDD<String, Integer> rdd1 = javaFunctions(jsc).geodeRegion(regionPath);
-    GeodeJavaRegionRDD<String, Integer> rdd2 = javaFunctions(jsc).geodeRegion(regionPath, emptyProps);
-    GeodeJavaRegionRDD<String, Integer> rdd3 = javaFunctions(jsc).geodeRegion(regionPath, connConf);
-    GeodeJavaRegionRDD<String, Integer> rdd4 = javaFunctions(jsc).geodeRegion(regionPath, connConf, emptyProps);
-    GeodeJavaRegionRDD<String, Integer> rdd5 = rdd1.where("value.intValue() < 50");
-
-    HashMap<String, Integer> expectedMap = new HashMap<>();
-    for (int i = 0; i < numObjects; i ++) {
-      expectedMap.put("k_" + i, i);
-    }
-
-    matchMapAndPairList(expectedMap, rdd1.collect());
-    matchMapAndPairList(expectedMap, rdd2.collect());
-    matchMapAndPairList(expectedMap, rdd3.collect());
-    matchMapAndPairList(expectedMap, rdd4.collect());
-
-    HashMap<String, Integer> expectedMap2 = new HashMap<>();
-    for (int i = 0; i < 50; i ++) {
-      expectedMap2.put("k_" + i, i);
-    }
-
-    matchMapAndPairList(expectedMap2, rdd5.collect());
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //   JavaPairRDD.joinGeodeRegion
-  // --------------------------------------------------------------------------------------------
-
-  @Test
-  public void testPairRDDJoinWithSameKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
-
-    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath);
-    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, connConf);
-    // System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
-
-    HashMap<Tuple2<String, Integer>, Integer> expectedMap = new HashMap<>();
-    for (int i = 0; i < 10; i ++) {
-      expectedMap.put(new Tuple2<>("k_" + i, i), i);
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  static class IntIntPairToStrKeyFunction implements Function<Tuple2<Integer, Integer>, String> {
-    @Override public String call(Tuple2<Integer, Integer> pair) throws Exception {
-      return "k_" + pair._1();
-    }
-  }
-
-  @Test
-  public void testPairRDDJoinWithDiffKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
-    Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction();
-
-    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
-    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
-
-    HashMap<Tuple2<Integer, Integer>, Integer> expectedMap = new HashMap<>();
-    for (int i = 0; i < 10; i ++) {
-      expectedMap.put(new Tuple2<>(i, i * 2), i);
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //   JavaPairRDD.outerJoinGeodeRegion
-  // --------------------------------------------------------------------------------------------
-
-  @Test
-  public void testPairRDDOuterJoinWithSameKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
-
-    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath);
-    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
-
-    HashMap<Tuple2<String, Integer>, Option<Integer>> expectedMap = new HashMap<>();
-    for (int i = -5; i < 10; i ++) {
-      if (i < 0)
-        expectedMap.put(new Tuple2<>("k_" + i, i), Option.apply((Integer) null));
-      else
-        expectedMap.put(new Tuple2<>("k_" + i, i), Some.apply(i));
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  @Test
-  public void testPairRDDOuterJoinWithDiffKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
-    Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction();
-
-    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
-    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
-
-    HashMap<Tuple2<Integer, Integer>, Option<Integer>> expectedMap = new HashMap<>();
-    for (int i = -5; i < 10; i ++) {
-      if (i < 0)
-        expectedMap.put(new Tuple2<>(i, i * 2), Option.apply((Integer) null));
-      else
-        expectedMap.put(new Tuple2<>(i, i * 2), Some.apply(i));
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //   JavaRDD.joinGeodeRegion
-  // --------------------------------------------------------------------------------------------
-
-  static class IntToStrKeyFunction implements Function<Integer, String> {
-    @Override public String call(Integer x) throws Exception {
-      return "k_" + x;
-    }
-  }
-
-  @Test
-  public void testRDDJoinWithSameKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
-
-    Function<Integer, String> func = new IntToStrKeyFunction();
-    JavaPairRDD<Integer, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath, func);
-    JavaPairRDD<Integer, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
-
-    HashMap<Integer, Integer> expectedMap = new HashMap<>();
-    for (int i = 0; i < 10; i ++) {
-      expectedMap.put(i, i);
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //   JavaRDD.outerJoinGeodeRegion
-  // --------------------------------------------------------------------------------------------
-
-  @Test
-  public void testRDDOuterJoinWithSameKeyType() throws Exception {
-    prepareStrIntRegion(regionPath, 0, numObjects);
-    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
-
-    Function<Integer, String> func = new IntToStrKeyFunction();
-    JavaPairRDD<Integer, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func);
-    JavaPairRDD<Integer, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf);
-    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n=========================");
-
-    HashMap<Integer, Option<Integer>> expectedMap = new HashMap<>();
-    for (int i = -5; i < 10; i ++) {
-      if (i < 0)
-        expectedMap.put(i, Option.apply((Integer) null));
-      else
-        expectedMap.put(i, Some.apply(i));
-    }
-    matchMapAndPairList(expectedMap, rdd2a.collect());
-    matchMapAndPairList(expectedMap, rdd2b.collect());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Portfolio.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Portfolio.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Portfolio.java
deleted file mode 100644
index dccebc9..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Portfolio.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark.connector;
-
-import java.io.Serializable;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.geode.cache.Declarable;
-
-/**
- * A stock portfolio that consists of multiple {@link Position} objects that
- * represent shares of stock (a "security").  Instances of
- * <code>Portfolio</code> can be stored in a Geode <code>Region</code> and
- * their contents can be queried using the Geode query service.
- * </p>
- * This class is <code>Serializable</code> because we want it to be distributed
- * to multiple members of a distributed system.  Because this class is
- * <code>Declarable</code>, we can describe instances of it in a Geode
- * <code>cache.xml</code> file.
- * </p>
- *
- */
-public class Portfolio implements Declarable, Serializable {
-
-  private static final long serialVersionUID = 9097335119586059309L;
-
-  private int id;  /* id is used as the entry key and is stored in the entry */
-  private String type;
-  private Map<String,Position> positions = new LinkedHashMap<String,Position>();
-  private String status;
-
-  public Portfolio(Properties props) {
-    init(props);
-  }
-
-  @Override
-  public void init(Properties props) {
-    this.id = Integer.parseInt(props.getProperty("id"));
-    this.type = props.getProperty("type", "type1");
-    this.status = props.getProperty("status", "active");
-
-    // get the positions. These are stored in the properties object
-    // as Positions, not String, so use Hashtable protocol to get at them.
-    // the keys are named "positionN", where N is an integer.
-    for (Map.Entry<Object, Object> entry: props.entrySet()) {
-      String key = (String)entry.getKey();
-      if (key.startsWith("position")) {
-        Position pos = (Position)entry.getValue();
-        this.positions.put(pos.getSecId(), pos);
-      }
-    }
-  }
-
-  public void setType(String t) {this.type = t; }
-
-  public String getStatus(){
-    return status;
-  }
-
-  public int getId(){
-    return this.id;
-  }
-
-  public Map<String,Position> getPositions(){
-    return this.positions;
-  }
-
-  public String getType() {
-    return this.type;
-  }
-
-  public boolean isActive(){
-    return status.equals("active");
-  }
-
-  @Override
-  public String toString(){
-    StringBuilder buf = new StringBuilder();
-    buf.append("\n\tPortfolio [id=" + this.id + " status=" + this.status);
-    buf.append(" type=" + this.type);
-    boolean firstTime = true;
-    for (Map.Entry<String, Position> entry: positions.entrySet()) {
-      if (!firstTime) {
-        buf.append(", ");
-      }
-      buf.append("\n\t\t");
-      buf.append(entry.getKey() + ":" + entry.getValue());
-      firstTime = false;
-    }
-    buf.append("]");
-    return buf.toString();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Position.java
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Position.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Position.java
deleted file mode 100644
index 0f24cdb..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/org/apache/geode/spark/connector/Position.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark.connector;
-
-import java.io.Serializable;
-import java.util.Properties;
-import org.apache.geode.cache.Declarable;
-
-/**
- * Represents a number of shares of a stock ("security") held in a {@link
- * Portfolio}.
- * </p>
- * This class is <code>Serializable</code> because we want it to be distributed
- * to multiple members of a distributed system.  Because this class is
- * <code>Declarable</code>, we can describe instances of it in a Geode
- * <code>cache.xml</code> file.
- * </p>
- *
- */
-public class Position implements Declarable, Serializable {
-
-  private static final long serialVersionUID = -8229531542107983344L;
-
-  private String secId;
-  private double qty;
-  private double mktValue;
-
-  public Position(Properties props) {
-    init(props);
-  }
-
-  @Override
-  public void init(Properties props) {
-    this.secId = props.getProperty("secId");
-    this.qty = Double.parseDouble(props.getProperty("qty"));
-    this.mktValue = Double.parseDouble(props.getProperty("mktValue"));
-  }
-
-  public String getSecId(){
-    return this.secId;
-  }
-
-  public double getQty(){
-    return this.qty;
-  }
-
-  public double getMktValue() {
-    return this.mktValue;
-  }
-
-  @Override
-  public String toString(){
-    return new StringBuilder()
-            .append("Position [secId=").append(secId)
-            .append(" qty=").append(this.qty)
-            .append(" mktValue=").append(mktValue).append("]").toString();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml b/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml
deleted file mode 100644
index b92c1f3..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE cache PUBLIC
-  "-//GemStone Systems, Inc.//GemFire Declarative Caching 7.0//EN"
-  "http://www.gemstone.com/dtd/cache7_0.dtd" >
-
-<cache>
-  <!-- test region for OQL test -->
-  <region name="obj_obj_region" refid="PARTITION_REDUNDANT" />
-
-  <region name="obj_obj_rep_region" refid="REPLICATE" />
-
-  <region name="str_int_region" refid="PARTITION_REDUNDANT">
-    <region-attributes>
-      <key-constraint>java.lang.String</key-constraint>
-      <value-constraint>java.lang.Integer</value-constraint>
-    </region-attributes>
-  </region>
-
-  <region name="str_str_region" refid="PARTITION_REDUNDANT">
-    <region-attributes>
-      <key-constraint>java.lang.String</key-constraint>
-      <value-constraint>java.lang.String</value-constraint>
-    </region-attributes>
-  </region>
-
-  <region name="str_str_rep_region" refid="REPLICATE">
-    <region-attributes>
-      <key-constraint>java.lang.String</key-constraint>
-      <value-constraint>java.lang.String</value-constraint>
-    </region-attributes>
-  </region>
-</cache>

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml b/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml
deleted file mode 100644
index 911a65d..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml
+++ /dev/null
@@ -1,57 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE cache PUBLIC
-  "-//GemStone Systems, Inc.//GemFire Declarative Caching 7.0//EN"
-  "http://www.gemstone.com/dtd/cache7_0.dtd" >
-
-<cache>
-  <!-- combinations of key, value types with region types -->
-  <region name="pr_r_obj_obj_region" refid="PARTITION_REDUNDANT" />
-  <region name="pr_obj_obj_region" refid="PARTITION" />
-  <region name="rr_obj_obj_region" refid="REPLICATE" />
-  <region name="rr_p_obj_obj_region" refid="REPLICATE_PERSISTENT" />
-
-  <region name="pr_r_str_int_region" refid="PARTITION_REDUNDANT">
-    <region-attributes>
-      <key-constraint>java.lang.String</key-constraint>
-      <value-constraint>java.lang.Integer</value-constraint>
-    </region-attributes>
-  </region>
-  
-  <region name="pr_str_int_region" refid="PARTITION">
-    <region-attributes>
-      <key-constraint>java.lang.String</key-constraint>
-      <value-constraint>java.lang.Integer</value-constraint>
-    </region-attributes>
-  </region>
-
-  <region name="rr_str_int_region" refid="REPLICATE">
-    <region-attributes>
-      <key-constraint>java.lang.String</key-constraint>
-      <value-constraint>java.lang.Integer</value-constraint>
-    </region-attributes>
-  </region>
-  
-  <region name="rr_p_str_int_region" refid="REPLICATE_PERSISTENT">
-    <region-attributes>
-      <key-constraint>java.lang.String</key-constraint>
-      <value-constraint>java.lang.Integer</value-constraint>
-    </region-attributes>
-  </region>
-</cache>

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/BasicIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/BasicIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/BasicIntegrationTest.scala
deleted file mode 100644
index c057e1d..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/BasicIntegrationTest.scala
+++ /dev/null
@@ -1,598 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark.connector
-
-import java.util.Properties
-import org.apache.geode.cache.query.QueryService
-import org.apache.geode.cache.query.internal.StructImpl
-import org.apache.geode.spark.connector._
-import org.apache.geode.cache.Region
-import org.apache.geode.spark.connector.internal.{RegionMetadata, DefaultGeodeConnectionManager}
-import org.apache.geode.spark.connector.internal.oql.{RDDConverter, QueryRDD}
-import ittest.org.apache.geode.spark.connector.testkit.GeodeCluster
-import ittest.org.apache.geode.spark.connector.testkit.IOUtils
-import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream}
-import org.apache.spark.{SparkContext, SparkConf}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import scala.collection.JavaConversions
-import scala.reflect.ClassTag
-
-case class Number(str: String, len: Int)
-
-class BasicIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
-
-  var sc: SparkContext = null
-
-  override def beforeAll() {
-    // start geode cluster, and spark context
-    val settings = new Properties()
-    settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml")
-    settings.setProperty("num-of-servers", "2")
-    val locatorPort = GeodeCluster.start(settings)
-
-    // start spark context in local mode
-    IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
-                            "log4j.logger.org.apache.geode.spark.connector" -> "DEBUG")
-    val conf = new SparkConf()
-      .setAppName("BasicIntegrationTest")
-      .setMaster("local[2]")
-      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-      .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
-      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .set("spark.kryo.registrator", "org.apache.geode.spark.connector.GeodeKryoRegistrator")
-
-    sc = new SparkContext(conf)
-  }
-
-  override def afterAll() {
-    // stop connection, spark context, and geode cluster
-    DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
-    sc.stop()
-    GeodeCluster.stop()
-  }
-
-  //Convert Map[Object, Object] to java.util.Properties
-  private def map2Props(map: Map[Object, Object]): java.util.Properties =
-    (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
-
-  // ===========================================================
-  //       DefaultGeodeConnection functional tests
-  // ===========================================================
-
-  test("DefaultGeodeConnection.validateRegion()") {
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-
-    // normal exist-region
-    var regionPath: String = "str_str_region"
-    conn.validateRegion[String, String](regionPath)
-
-    // non-exist region
-    regionPath = "non_exist_region"
-    try {
-      conn.validateRegion[String, String](regionPath)
-      fail("validateRegion failed to catch non-exist region error")
-    } catch {
-      case e: RuntimeException => 
-        if (! e.getMessage.contains(s"The region named $regionPath was not found"))
-          fail("validateRegion gives wrong exception on non-exist region", e)
-      case e: Throwable => 
-        fail("validateRegion gives wrong exception on non-exist region", e)
-    }
-
-    // Note: currently, can't catch type mismatch error
-    conn.validateRegion[String, Integer]("str_str_region")
-  }
-
-  test("DefaultGeodeConnection.getRegionMetadata()") {
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-
-    // exist region
-    validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, false)
-    validateRegionMetadata(conn, "str_int_region", true, 113, "java.lang.String", "java.lang.Integer", false)
-    validateRegionMetadata(conn, "str_str_rep_region", false, 0, "java.lang.String", "java.lang.String", true)
-
-    // non-exist region
-    assert(! conn.getRegionMetadata("no_exist_region").isDefined)
-  }
-    
-  def validateRegionMetadata(
-    conn: GeodeConnection, regionPath: String, partitioned: Boolean, buckets: Int,
-    keyType: String, valueType: String, emptyMap: Boolean): Unit = {
-
-    val mdOption = conn.getRegionMetadata(regionPath)
-    val md = mdOption.get
-   
-    assert(md.getRegionPath == s"/$regionPath")
-    assert(md.isPartitioned == partitioned)
-    assert(md.getKeyTypeName == keyType)
-    assert(md.getValueTypeName == valueType)
-    assert(md.getTotalBuckets == buckets)
-    if (emptyMap) assert(md.getServerBucketMap == null) 
-    else assert(md.getServerBucketMap != null)
-  }
-
-  test("DefaultGeodeConnection.getRegionProxy()") {
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-
-    val region1 = conn.getRegionProxy[String, String]("str_str_region")
-    region1.put("1", "One")
-    assert(region1.get("1") == "One")
-    region1.remove("1")
-    assert(region1.get("1") == null)
-    
-    // getRegionProxy doesn't fail when region doesn't exist
-    val region2 = conn.getRegionProxy[String, String]("non_exist_region")
-    try {
-      region2.put("1", "One")
-      fail("getRegionProxy failed to catch non-exist region error")
-    } catch {
-      case e: Exception =>
-        if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region named /non_exist_region was not found")) {
-          e.printStackTrace()
-          fail("validateRegion gives wrong exception on non-exist region", e)
-        }
-    }
-  }
-  
-  // Note: DefaultGeodeConnecton.getQuery() and getRegionData() are covered by
-  //       RetrieveRegionIntegrationTest.scala and following OQL tests.
-  
-  // ===========================================================
-  //                OQL functional tests
-  // ===========================================================
-  
-  private def initRegion(regionName: String): Unit = {
-
-    //Populate some data in the region
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-
-    //This will call the implicit conversion map2Properties in connector package object, since it is Map[String, String]
-    var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", "mktValue" -> "24.42"))
-    var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", "mktValue" -> "34.29"))
-    val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> "type1", "status" -> "active",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("1", portfolio1)
-
-    position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", "mktValue" -> "12.925"))
-    position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", "mktValue" -> "21.972"))
-    val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> "type2", "status" -> "inactive",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("2", portfolio2)
-
-    position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
-    position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
-    val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> "type3", "status" -> "active",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("3", portfolio3)
-
-    position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" -> "67.356572"))
-    position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" -> "101.34"))
-    val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> "type1", "status" -> "inactive",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("4", portfolio4)
-
-    position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" -> "67.356572"))
-    position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" -> "101.34"))
-    val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> "type2", "status" -> "active",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("5", portfolio5)
-
-    position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" -> "67.356572"))
-    position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" -> "101.34"))
-    val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> "type3", "status" -> "inactive",
-      "position1" -> position1, "position2" -> position2)))
-    rgn.put("6", portfolio6)
-
-    position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
-    position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
-    val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> "type4", "status" -> "active",
-      "position1" -> position1, "position2" -> position2)))
-    //Not using null, due to intermittent query failure on column containing null, likely a Spark SQL bug
-    //portfolio7.setType(null)
-    rgn.put("7", portfolio7)
-  }
-
-  private def getQueryRDD[T: ClassTag](
-    query: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)): QueryRDD[T] =
-      new QueryRDD[T](sc, query, connConf)
-
-  test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Partitioned Region") {
-    simpleQuery("obj_obj_region")
-  }
-
-  test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Replicated Region") {
-    simpleQuery("obj_obj_rep_region")
-  }
-
-  private def simpleQuery(regionName: String) {
-    //Populate some data in the region
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, String] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
-
-    //Create QueryRDD using OQL
-    val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from /$regionName")
-
-    //verify the QueryRDD
-    val oqlRS: Array[String] = OQLResult.collect()
-    oqlRS should have length 3
-    oqlRS should contain theSameElementsAs List("one", "two", "three")
-
-    //Convert QueryRDD to DataFrame
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    // this is used to implicitly convert an RDD to a DataFrame.
-    import sqlContext.implicits._
-    val dataFrame = OQLResult.map(x => Number(x, x.length)).toDF()
-    //Register dataFrame as a table of two columns of type String and Int respectively
-    dataFrame.registerTempTable("numberTable")
-
-    //Issue SQL query against the table
-    val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
-    //Verify the SQL query result, r(0) mean column 0
-    val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
-    sqlRS should have length 3
-    sqlRS should contain theSameElementsAs List("one", "two", "three")
-
-    //Convert QueryRDD to DataFrame using RDDConverter
-    val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
-    //Register dataFrame2 as a table of two columns of type String and Int respectively
-    dataFrame2.registerTempTable("numberTable2")
-
-    //Issue SQL query against the table
-    val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2")
-    //Verify the SQL query result, r(0) mean column 0
-    val sqlRS2: Array[Any] = SQLResult2.map(r => r(0)).collect()
-    sqlRS2 should have length 3
-    sqlRS2 should contain theSameElementsAs List("one", "two", "three")
-
-    //Remove the region entries, because other tests might use the same region as well
-    List("1", "2", "3").foreach(rgn.remove)
-  }
-
-  test("Run Geode OQL query and directly return DataFrame: Partitioned Region") {
-    simpleQueryDataFrame("obj_obj_region")
-  }
-
-  test("Run Geode OQL query and directly return DataFrame: Replicated Region") {
-    simpleQueryDataFrame("obj_obj_rep_region")
-  }
-
-  private def simpleQueryDataFrame(regionName: String) {
-    //Populate some data in the region
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[String, String] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"select * from /$regionName")
-    dataFrame.registerTempTable("numberTable")
-
-    //Issue SQL query against the table
-    val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
-    //Verify the SQL query result, r(0) mean column 0
-    val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
-    sqlRS should have length 3
-    sqlRS should contain theSameElementsAs List("one", "two", "three")
-
-    //Remove the region entries, because other tests might use the same region as well
-    List("1", "2", "3").foreach(rgn.remove)
-  }
-
-  test("Geode OQL query with UDT: Partitioned Region") {
-    queryUDT("obj_obj_region")
-  }
-
-  test("Geode OQL query with UDT: Replicated Region") {
-    queryUDT("obj_obj_rep_region")
-  }
-
-  private def queryUDT(regionName: String) {
-
-    //Populate some data in the region
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    val e1: Employee = new Employee("hello", 123)
-    val e2: Employee = new Employee("world", 456)
-    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
-
-    //Create QueryRDD using OQL
-    val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from /$regionName")
-
-    //verify the QueryRDD
-    val oqlRS: Array[Object] = OQLResult.collect()
-    oqlRS should have length 2
-    oqlRS.map(e => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should contain theSameElementsAs List(123, 456)
-
-    //Convert QueryRDD to DataFrame
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
-    //Convert QueryRDD to DataFrame using RDDConverter
-    val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
-    dataFrame.registerTempTable("employee")
-    val SQLResult = sqlContext.sql("SELECT * FROM employee")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.map(r => r(0)).collect()
-    sqlRS should have length 2
-    sqlRS should contain theSameElementsAs List("hello", "world")
-
-    List("1", "2").foreach(rgn.remove)
-  }
-
-  test("Geode OQL query with UDT and directly return DataFrame: Partitioned Region") {
-    queryUDTDataFrame("obj_obj_region")
-  }
-
-  test("Geode OQL query with UDT and directly return DataFrame: Replicated Region") {
-    queryUDTDataFrame("obj_obj_rep_region")
-  }
-
-  private def queryUDTDataFrame(regionName: String) {
-    //Populate some data in the region
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    val e1: Employee = new Employee("hello", 123)
-    val e2: Employee = new Employee("world", 456)
-    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"select name, age from /$regionName")
-
-    dataFrame.registerTempTable("employee")
-    val SQLResult = sqlContext.sql("SELECT * FROM employee")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.map(r => r(0)).collect()
-    sqlRS should have length 2
-    sqlRS should contain theSameElementsAs List("hello", "world")
-
-    List("1", "2").foreach(rgn.remove)
-  }
-
-  test("Geode OQL query with more complex UDT: Partitioned Region") {
-    complexUDT("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT: Replicated Region") {
-    complexUDT("obj_obj_rep_region")
-  }
-
-  private def complexUDT(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create QueryRDD using OQL
-    val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
-
-    //verify the QueryRDD
-    val oqlRS: Array[Int] = OQLResult.collect().map(r => r.asInstanceOf[Portfolio].getId)
-    oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
-
-    //Convert QueryRDD to DataFrame
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
-    //Convert QueryRDD to DataFrame using RDDConverter
-    val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
-
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
-    sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
-  }
-
-  test("Geode OQL query with more complex UDT and directly return DataFrame: Partitioned Region") {
-    complexUDTDataFrame("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT and directly return DataFrame: Replicated Region") {
-    complexUDTDataFrame("obj_obj_rep_region")
-  }
-
-  private def complexUDTDataFrame(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
-    sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
-  }
-
-  test("Geode OQL query with more complex UDT with Projection: Partitioned Region") {
-    queryComplexUDTProjection("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT with Projection: Replicated Region") {
-    queryComplexUDTProjection("obj_obj_rep_region")
-  }
-
-  private def queryComplexUDTProjection(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create QueryRDD using OQL
-    val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
-
-    //verify the QueryRDD
-    val oqlRS: Array[Int] = OQLResult.collect().map(si => si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int])
-    oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
-
-    //Convert QueryRDD to DataFrame
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
-    //Convert QueryRDD to DataFrame using RDDConverter
-    val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
-
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0))
-    sqlRS should contain theSameElementsAs List(3)
-  }
-
-  test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Partitioned Region") {
-    queryComplexUDTProjectionDataFrame("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Replicated Region") {
-    queryComplexUDTProjectionDataFrame("obj_obj_rep_region")
-  }
-
-  private def queryComplexUDTProjectionDataFrame(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0))
-    sqlRS should contain theSameElementsAs List(3)
-  }
-
-  test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Partitioned Region") {
-    queryComplexUDTNestProjectionDataFrame("obj_obj_region")
-  }
-
-  test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Replicated Region") {
-    queryComplexUDTNestProjectionDataFrame("obj_obj_rep_region")
-  }
-
-  private def queryComplexUDTNestProjectionDataFrame(regionName: String) {
-
-    initRegion(regionName)
-
-    //Create DataFrame using Geode OQL
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"""SELECT r.id, r."type", r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'""")
-    dataFrame.registerTempTable("Portfolio")
-
-    val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
-
-    //Verify the SQL query result
-    val sqlRS = SQLResult.collect().map(r => r(0))
-    sqlRS should contain theSameElementsAs List(3)
-  }
-
-  test("Undefined instance deserialization: Partitioned Region") {
-    undefinedInstanceDeserialization("obj_obj_region")
-  }
-
-  test("Undefined instance deserialization: Replicated Region") {
-    undefinedInstanceDeserialization("obj_obj_rep_region")
-  }
-
-  private def undefinedInstanceDeserialization(regionName: String) {
-
-    val conn = GeodeConnectionConf(sc.getConf).getConnection
-    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-
-    //Put some new data
-    rgn.put("1", "one")
-
-    //Query some non-existent columns, which should return UNDEFINED
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-    val dataFrame = sqlContext.geodeOQL(s"SELECT col100, col200 FROM /$regionName")
-    val col1 = dataFrame.first().apply(0)
-    val col2 = dataFrame.first().apply(1)
-    assert(col1 == QueryService.UNDEFINED)
-    assert(col2 == QueryService.UNDEFINED)
-    //Verify that col1 and col2 refer to the same Undefined object
-    assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef])
-  }
-
-  test("RDD.saveToGeode") {
-    val regionName = "str_str_region"
-    // generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66))
-    val data = (1 to 6).map(_.toString).map(e=> (e, e*2))
-    val rdd = sc.parallelize(data)
-    rdd.saveToGeode(regionName)
-
-    // verify
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val region: Region[String, String] = connConf.getConnection.getRegionProxy(regionName)
-    println("region key set on server: " + region.keySetOnServer())
-    assert((1 to 6).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
-    (1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e)))
-  }
-
-  // ===========================================================
-  //        DStream.saveToGeode() functional tests
-  // ===========================================================
-
-  test("Basic DStream test") {
-    import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener}
-    import org.apache.geode.spark.connector.streaming._
-    import org.apache.spark.streaming.ManualClockHelper
-
-    class TestStreamListener extends StreamingListener {
-      var count = 0
-      override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = count += 1
-    }
-
-    def batchDuration = Seconds(1)
-    val ssc = new StreamingContext(sc, batchDuration)
-    val input = Seq(1 to 4, 5 to 8, 9 to 12)
-    val dstream = new TestInputDStream(ssc, input, 2)
-    dstream.saveToGeode[String, Int]("str_int_region", (e: Int) => (e.toString, e))
-    try {
-      val listener = new TestStreamListener
-      ssc.addStreamingListener(listener)
-      ssc.start()
-      ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * input.length)
-      while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50)
-    } catch {
-      case e: Exception => e.printStackTrace(); throw e
-//    } finally {
-//      ssc.stop()
-    }
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val region: Region[String, Int] = conn.getRegionProxy("str_int_region")
-
-    // verify geode region contents
-    println("region key set on server: " + region.keySetOnServer())
-    assert((1 to 12).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
-    (1 to 12).foreach(e => assert(e == region.get(e.toString)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RDDJoinRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
deleted file mode 100644
index 1688345..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RDDJoinRegionIntegrationTest.scala
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark.connector
-
-import java.util.Properties
-
-import org.apache.geode.spark.connector._
-import org.apache.geode.cache.Region
-import org.apache.geode.spark.connector.internal.DefaultGeodeConnectionManager
-import ittest.org.apache.geode.spark.connector.testkit.GeodeCluster
-import ittest.org.apache.geode.spark.connector.testkit.IOUtils
-import org.apache.spark.{SparkContext, SparkConf}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import java.util.{HashMap => JHashMap}
-
-class RDDJoinRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
-
-  var sc: SparkContext = null
-  val numServers = 3
-  val numObjects = 1000
-
-  override def beforeAll() {
-    // start geode cluster, and spark context
-    val settings = new Properties()
-    settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
-    settings.setProperty("num-of-servers", numServers.toString)
-    val locatorPort = GeodeCluster.start(settings)
-
-    // start spark context in local mode
-    IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
-      "log4j.logger.org.apache.geode.spark.connector" -> "DEBUG")
-    val conf = new SparkConf()
-      .setAppName("RDDJoinRegionIntegrationTest")
-      .setMaster("local[2]")
-      .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
-    sc = new SparkContext(conf)
-  }
-
-  override def afterAll() {
-    // stop connection, spark context, and geode cluster
-    DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
-    sc.stop()
-    GeodeCluster.stop()
-  }
-
-//  def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
-//    assert(map1.size == map2.size)
-//    map1.foreach(e => {
-//      assert(map2.contains(e._1))
-//      assert (e._2 == map2.get(e._1).get)
-//    })
-//  }
-  
-  // -------------------------------------------------------------------------------------------- 
-  // PairRDD.joinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2]
-  // -------------------------------------------------------------------------------------------- 
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", JoinTest) {
-    verifyPairRDDJoinRegionWithSameKeyType("rr_str_int_region")
-  }
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", JoinTest) {
-    verifyPairRDDJoinRegionWithSameKeyType("pr_str_int_region")
-  }
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", JoinTest) {
-    verifyPairRDDJoinRegionWithSameKeyType("pr_r_str_int_region")
-  }
-
-  def verifyPairRDDJoinRegionWithSameKeyType(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => ("k_" + x, x*2))
-    val rdd = sc.parallelize(data)
-
-    val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, connConf)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (0 until 50).map(i => ((s"k_$i", i*2), i)).toMap
-    // matchMaps[(String, Int), Int](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // ------------------------------------------------------------------------------------------------------
-  // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2]
-  // -------------------------------------------------------------------------------------------------------
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", JoinTest) {
-    verifyPairRDDJoinRegionWithDiffKeyType("rr_str_int_region")
-  }
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", JoinTest) {
-    verifyPairRDDJoinRegionWithDiffKeyType("pr_str_int_region")
-  }
-
-  test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", JoinTest) {
-    verifyPairRDDJoinRegionWithDiffKeyType("pr_r_str_int_region")
-  }
-
-  def verifyPairRDDJoinRegionWithDiffKeyType(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => (x, x*2))
-    val rdd = sc.parallelize(data)
-
-    val func :((Int, Int)) => String = pair => s"k_${pair._1}"
-
-    val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, func /*, connConf*/)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (0 until 50).map(i => ((i, i*2), i)).toMap
-    // matchMaps[(Int, Int), Int](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // ------------------------------------------------------------------------------------------------ 
-  // PairRDD.outerJoinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2]
-  // ------------------------------------------------------------------------------------------------ 
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithSameKeyType("rr_str_int_region")
-  }
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithSameKeyType("pr_str_int_region")
-  }
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithSameKeyType("pr_r_str_int_region")
-  }
-
-  def verifyPairRDDOuterJoinRegionWithSameKeyType(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => ("k_" + x, x*2))
-    val rdd = sc.parallelize(data)
-
-    val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath /*, connConf*/)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (-5 until 50).map {
-      i => if (i < 0) ((s"k_$i", i * 2), None)
-      else ((s"k_$i", i*2), Some(i))}.toMap
-    // matchMaps[(String, Int), Option[Int]](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // ------------------------------------------------------------------------------------------------------
-  // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2]
-  // -------------------------------------------------------------------------------------------------------
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithDiffKeyType("rr_str_int_region")
-  }
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_str_int_region")
-  }
-
-  test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", OuterJoinTest) {
-    verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_r_str_int_region")
-  }
-
-  def verifyPairRDDOuterJoinRegionWithDiffKeyType(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => (x, x*2))
-    val rdd = sc.parallelize(data)
-
-    val func :((Int, Int)) => String = pair => s"k_${pair._1}"
-
-    val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, func, connConf)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (-5 until 50).map {
-      i => if (i < 0) ((i, i * 2), None)
-      else ((i, i*2), Some(i))}.toMap
-    // matchMaps[(Int, Int), Option[Int]](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // --------------------------------------------------------------------------------------------
-  // RDD.joinGeodeRegion[K, V](regionPath, T => K,  connConf): GeodeJoinRDD[T, K, V]
-  // --------------------------------------------------------------------------------------------
-
-  test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], replicated region", JoinTest) {
-    verifyRDDJoinRegion("rr_str_int_region")
-  }
-
-  test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned region", JoinTest) {
-    verifyRDDJoinRegion("pr_str_int_region")
-  }
-
-  test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", JoinTest) {
-    verifyRDDJoinRegion("pr_r_str_int_region")
-  }
-
-  def verifyRDDJoinRegion(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => s"k_$x")
-    val rdd = sc.parallelize(data)
-
-    val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, x => x, connConf)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (0 until 50).map(i => (s"k_$i", i)).toMap
-    // matchMaps[String, Int](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-
-  // --------------------------------------------------------------------------------------------
-  // RDD.outerJoinGeodeRegion[K, V](regionPath, T => K, connConf): GeodeJoinRDD[T, K, V]
-  // --------------------------------------------------------------------------------------------
-
-  test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], replicated region", OnlyTest) {
-    verifyRDDOuterJoinRegion("rr_str_int_region")
-  }
-
-  test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned region", OnlyTest) {
-    verifyRDDOuterJoinRegion("pr_str_int_region")
-  }
-
-  test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", OnlyTest) {
-    verifyRDDOuterJoinRegion("pr_r_str_int_region")
-  }
-
-  def verifyRDDOuterJoinRegion(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val data = (-5 until 50).map(x => s"k_$x")
-    val rdd = sc.parallelize(data)
-
-    val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, x => x /*, connConf */)
-    val rdd2Content = rdd2.collect()
-
-    val expectedMap = (-5 until 50).map {
-      i => if (i < 0) (s"k_$i", None)
-           else (s"k_$i", Some(i))}.toMap
-    // matchMaps[String, Option[Int]](expectedMap, rdd2Content.toMap)
-    assert(expectedMap == rdd2Content.toMap)
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RetrieveRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RetrieveRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RetrieveRegionIntegrationTest.scala
deleted file mode 100644
index 7c441a3..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RetrieveRegionIntegrationTest.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark.connector
-
-import java.util.Properties
-
-import org.apache.geode.spark.connector._
-import org.apache.geode.cache.Region
-import org.apache.geode.spark.connector.internal.DefaultGeodeConnectionManager
-import ittest.org.apache.geode.spark.connector.testkit.GeodeCluster
-import ittest.org.apache.geode.spark.connector.testkit.IOUtils
-import org.apache.spark.{SparkContext, SparkConf}
-import org.scalatest.{Tag, BeforeAndAfterAll, FunSuite, Matchers}
-import java.util.{HashMap => JHashMap}
-
-
-class RetrieveRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster {
-
-  var sc: SparkContext = null
-  val numServers = 4
-  val numObjects = 1000
-
-  override def beforeAll() {
-    // start geode cluster, and spark context
-    val settings = new Properties()
-    settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
-    settings.setProperty("num-of-servers", numServers.toString)
-    val locatorPort = GeodeCluster.start(settings)
-
-    // start spark context in local mode
-    IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
-                            "log4j.logger.org.apache.geode.spark.connector" -> "DEBUG")
-    val conf = new SparkConf()
-      .setAppName("RetrieveRegionIntegrationTest")
-      .setMaster("local[2]")
-      .set(GeodeLocatorPropKey, s"localhost[$locatorPort]")
-    sc = new SparkContext(conf)
-  }
-
-  override def afterAll() {
-    // stop connection, spark context, and geode cluster
-    DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf))
-    sc.stop()
-    GeodeCluster.stop()
-  }
-  
-  def executeTest[K,V](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,V]) = {
-    //Populate some data in the region
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[K, V] = conn.getRegionProxy(regionName)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-    verifyRetrieveRegion[K,V](regionName, entriesMap)
-  }
-    
-  def verifyRetrieveRegion[K,V](regionName:String, entriesMap:java.util.Map[K,V])  = {
-    val rdd = sc.geodeRegion(regionName)
-    val collectedObjs = rdd.collect()
-    collectedObjs should have length entriesMap.size
-    import scala.collection.JavaConverters._
-    matchMaps[K,V](entriesMap.asScala.toMap, collectedObjs.toMap)
-  }
- 
-  def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
-    assert(map1.size == map2.size)
-    map1.foreach(e => {
-      assert(map2.contains(e._1))
-      assert (e._2 == map2.get(e._1).get)
-      }
-    )
-  }
-  
-  //Retrieve region for Partitioned Region where some nodes are empty (empty iterator)
-  //This test has to run first...the rest of the tests always use the same num objects
-  test("Retrieve Region for PR where some nodes are empty (Empty Iterator)") {
-    val numObjects = numServers - 1
-    val entriesMap:JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
-    executeTest[String, Int]("rr_str_int_region", numObjects, entriesMap)
-  }
-
-  //Test for retrieving from region containing string key and int value
-  def verifyRetrieveStringStringRegion(regionName:String) = {
-    val entriesMap:JHashMap[String, String] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, "value_" + i))
-    executeTest[String, String](regionName, numObjects, entriesMap)
-  }
-
-  test("Retrieve Region with replicate redundant string string") {
-    verifyRetrieveStringStringRegion("rr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned string string") {
-    verifyRetrieveStringStringRegion("pr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned redundant string string") {
-    verifyRetrieveStringStringRegion("pr_r_obj_obj_region")
-  }
-  
-
-  //Test for retrieving from region containing string key and string value
-  def verifyRetrieveStringIntRegion(regionName:String) = {
-    val entriesMap:JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
-    executeTest[String, Int](regionName, numObjects, entriesMap)
-  }
-
-  test("Retrieve Region with replicate string int region") {
-    verifyRetrieveStringIntRegion("rr_str_int_region")
-  }
-
-  test("Retrieve Region with partitioned string int region") {
-    verifyRetrieveStringIntRegion("pr_str_int_region")
-  }
-
-  test("Retrieve Region with partitioned redundant string int region") {
-    verifyRetrieveStringIntRegion("pr_r_str_int_region")
-  }
-
-  //Tests for retrieving from region containing string key and object value
-  def verifyRetrieveStringObjectRegion(regionName:String) = {
-    val entriesMap:JHashMap[String, Object] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, new Employee("ename" + i, i)))
-    executeTest[String, Object](regionName, numObjects, entriesMap)
-  }
-
-  test("Retrieve Region with replicate string obj") {
-    verifyRetrieveStringObjectRegion("rr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned string obj") {
-    verifyRetrieveStringObjectRegion("pr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned redundant string obj") {
-    verifyRetrieveStringObjectRegion("pr_r_obj_obj_region")
-  }
-
-  //Test for retrieving from region containing string key and map value
-  def verifyRetrieveStringMapRegion(regionName:String) = {
-    val entriesMap:JHashMap[String,JHashMap[String,String]] = new JHashMap()
-    (0 until numObjects).map(i => {
-      val hashMap:JHashMap[String, String] = new JHashMap()
-      hashMap.put("mapKey:" + i, "mapValue:" + i)
-      entriesMap.put("key_" + i, hashMap)
-    })
-    executeTest(regionName, numObjects, entriesMap)
-  }
-
-  test("Retrieve Region with replicate string map region") {
-    verifyRetrieveStringMapRegion("rr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned string map region") {
-    verifyRetrieveStringMapRegion("pr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partitioned redundant string map region") {
-    verifyRetrieveStringMapRegion("pr_r_obj_obj_region")
-  }
-  
-  //Test and helpers specific for retrieving from region containing string key and byte[] value
-  def executeTestWithByteArrayValues[K](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,Array[Byte]]) = {
-    //Populate some data in the region
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[K, Array[Byte]] = conn.getRegionProxy(regionName)
-    rgn.putAll(entriesMap)
-    verifyRetrieveRegionWithByteArrayValues[K](regionName, entriesMap)
-  }
-  
-  def verifyRetrieveRegionWithByteArrayValues[K](regionName:String, entriesMap:java.util.Map[K,Array[Byte]])  = {
-    val rdd = sc.geodeRegion(regionName)
-    val collectedObjs = rdd.collect()
-    collectedObjs should have length entriesMap.size
-    import scala.collection.JavaConverters._
-    matchByteArrayMaps[K](entriesMap.asScala.toMap, collectedObjs.toMap)
-  }
-  
-  def matchByteArrayMaps[K](map1:Map[K,Array[Byte]], map2:Map[K,Array[Byte]]) = {
-    map1.foreach(e => {
-      assert(map2.contains(e._1))
-      assert (java.util.Arrays.equals(e._2, map2.get(e._1).get))
-      }
-    )
-    assert(map1.size == map2.size)
-
-  }
-  
-  def verifyRetrieveStringByteArrayRegion(regionName:String) = {
-    val entriesMap:JHashMap[String, Array[Byte]] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, Array[Byte](192.toByte, 168.toByte, 0, i.toByte)))
-    executeTestWithByteArrayValues[String](regionName, numObjects, entriesMap)
-  }
-      
-  test("Retrieve Region with replicate region string byte[] region") {
-    verifyRetrieveStringByteArrayRegion("rr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partition region string byte[] region") {
-    verifyRetrieveStringByteArrayRegion("pr_obj_obj_region")
-  }
-
-  test("Retrieve Region with partition redundant region string byte[] region") {
-    verifyRetrieveStringByteArrayRegion("pr_r_obj_obj_region")
-  }
-
-  test("Retrieve Region with where clause on partitioned redundant region", FilterTest) {
-    verifyRetrieveRegionWithWhereClause("pr_r_str_int_region")
-  }
-
-  test("Retrieve Region with where clause on partitioned region", FilterTest) {
-    verifyRetrieveRegionWithWhereClause("pr_str_int_region")
-  }
-
-  test("Retrieve Region with where clause on replicated region", FilterTest) {
-    verifyRetrieveRegionWithWhereClause("rr_str_int_region")
-  }
-
-  def verifyRetrieveRegionWithWhereClause(regionPath: String): Unit = {
-    val entriesMap: JHashMap[String, Int] = new JHashMap()
-    (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
-
-    val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)
-    val conn = connConf.getConnection
-    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
-    rgn.removeAll(rgn.keySetOnServer())
-    rgn.putAll(entriesMap)
-
-    val rdd = sc.geodeRegion(regionPath).where("value.intValue() < 50")
-    val expectedMap = (0 until 50).map(i => (s"key_$i", i)).toMap
-    val collectedObjs = rdd.collect()
-    // collectedObjs should have length expectedMap.size
-    matchMaps[String, Int](expectedMap, collectedObjs.toMap)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/package.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/package.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/package.scala
deleted file mode 100644
index fb379b4..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark
-
-import org.scalatest.Tag
-
-package object connector {
-
-  object OnlyTest extends Tag("OnlyTest")
-  object FetchDataTest extends Tag("FetchDateTest")
-  object FilterTest extends Tag("FilterTest")
-  object JoinTest extends Tag("JoinTest")
-  object OuterJoinTest extends Tag("OuterJoinTest")  
-  
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/b27a79ae/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeCluster.scala
----------------------------------------------------------------------
diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeCluster.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeCluster.scala
deleted file mode 100644
index dd31bfe..0000000
--- a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeCluster.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ittest.org.apache.geode.spark.connector.testkit
-
-import java.util.Properties
-
-trait GeodeCluster {
-  def startGeodeCluster(settings: Properties): Int = {
-    println("=== GeodeCluster start()")
-    GeodeCluster.start(settings)
-  }
-}
-
-object GeodeCluster {
-  private var geode: Option[GeodeRunner] = None
-
-  def start(settings: Properties): Int = {
-    geode.map(_.stopGeodeCluster()) // Clean up any old running Geode instances
-    val runner = new GeodeRunner(settings)
-    geode = Some(runner)
-    runner.getLocatorPort
-  }
-
-  def stop(): Unit = {
-    println("=== GeodeCluster shutdown: " + geode.toString)
-    geode match {
-      case None => println("Nothing to shutdown.")
-      case Some(runner) => runner.stopGeodeCluster()
-    }
-    geode = None
-    println("=== GeodeCluster shutdown finished.")
-  }
-}


Mime
View raw message