Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 48A0E19121 for ; Thu, 21 Apr 2016 17:17:17 +0000 (UTC) Received: (qmail 33603 invoked by uid 500); 21 Apr 2016 17:17:17 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 33565 invoked by uid 500); 21 Apr 2016 17:17:17 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 33477 invoked by uid 99); 21 Apr 2016 17:17:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Apr 2016 17:17:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 78527C05EF for ; Thu, 21 Apr 2016 17:17:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id jQDQThRpjkLP for ; Thu, 21 Apr 2016 17:16:41 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 90A4C5FAC3 for ; Thu, 21 Apr 2016 17:16:38 +0000 (UTC) Received: (qmail 29724 invoked by uid 99); 21 Apr 2016 17:16:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Apr 2016 17:16:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5B9ADFB7C; Thu, 21 Apr 2016 17:16:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jinmeiliao@apache.org To: commits@geode.incubator.apache.org Date: Thu, 21 Apr 2016 17:17:23 -0000 Message-Id: <71988c9f5f2c44d6b750ab08337690ed@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [48/50] [abbrv] incubator-geode git commit: GEODE-1244: Package, directory, project and file rename for geode-spark-connector http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java deleted file mode 100644 index bb75c7a..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark.connector; - -import com.gemstone.gemfire.cache.Region; -import io.pivotal.gemfire.spark.connector.GemFireConnection; -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf$; -import io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager$; -import io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaRegionRDD; -import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster$; -import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.scalatest.junit.JUnitSuite; -import io.pivotal.gemfire.spark.connector.package$; -import scala.Tuple2; -import scala.Option; -import scala.Some; - -import java.util.*; - -import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.RDDSaveBatchSizePropKey; -import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.javaFunctions; -import static org.junit.Assert.*; - -public class JavaApiIntegrationTest extends JUnitSuite { - - static JavaSparkContext jsc = null; - static GemFireConnectionConf connConf = null; - - static int numServers = 2; - static int numObjects = 1000; - static String regionPath = "pr_str_int_region"; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // start gemfire cluster, and spark context - Properties settings = new Properties(); - settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml"); - settings.setProperty("num-of-servers", Integer.toString(numServers)); - int locatorPort = GemFireCluster$.MODULE$.start(settings); - - // start spark context in local mode - Properties props = new Properties(); - props.put("log4j.logger.org.apache.spark", "INFO"); - props.put("log4j.logger.io.pivotal.gemfire.spark.connector","DEBUG"); - IOUtils.configTestLog4j("ERROR", props); - SparkConf conf = new SparkConf() - .setAppName("RetrieveRegionIntegrationTest") - .setMaster("local[2]") - .set(package$.MODULE$.GemFireLocatorPropKey(), "localhost:"+ locatorPort); - // sc = new SparkContext(conf); - jsc = new JavaSparkContext(conf); - connConf = GemFireConnectionConf.apply(jsc.getConf()); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - // stop connection, spark context, and gemfire cluster - DefaultGemFireConnectionManager$.MODULE$.closeConnection(GemFireConnectionConf$.MODULE$.apply(jsc.getConf())); - jsc.stop(); - GemFireCluster$.MODULE$.stop(); - } - - // -------------------------------------------------------------------------------------------- - // utility methods - // -------------------------------------------------------------------------------------------- - - private void matchMapAndPairList(Map map, List> list) { - assertTrue("size mismatch \nmap: " + map.toString() + "\nlist: " + list.toString(), map.size() == list.size()); - for (Tuple2 p : list) { - assertTrue("value mismatch: k=" + p._1() + " v1=" + p._2() + " v2=" + map.get(p._1()), - p._2().equals(map.get(p._1()))); - } - } - - private Region prepareStrIntRegion(String regionPath, int start, int stop) { - HashMap entriesMap = new HashMap<>(); - for (int i = start; i < stop; i ++) { - entriesMap.put("k_" + i, i); - } - - GemFireConnection conn = connConf.getConnection(); - Region region = conn.getRegionProxy(regionPath); - region.removeAll(region.keySetOnServer()); - region.putAll(entriesMap); - return region; - } - - private JavaPairRDD prepareStrIntJavaPairRDD(int start, int stop) { - List> data = new ArrayList<>(); - for (int i = start; i < stop; i ++) { - data.add(new Tuple2<>("k_" + i, i)); - } - return jsc.parallelizePairs(data); - } - - private JavaPairRDD prepareIntIntJavaPairRDD(int start, int stop) { - List> data = new ArrayList<>(); - for (int i = start; i < stop; i ++) { - data.add(new Tuple2<>(i, i * 2)); - } - return jsc.parallelizePairs(data); - } - - private JavaRDD prepareIntJavaRDD(int start, int stop) { - List data = new ArrayList<>(); - for (int i = start; i < stop; i ++) { - data.add(i); - } - return jsc.parallelize(data); - } - - // -------------------------------------------------------------------------------------------- - // JavaRDD.saveToGemfire - // -------------------------------------------------------------------------------------------- - - static class IntToStrIntPairFunction implements PairFunction { - @Override public Tuple2 call(Integer x) throws Exception { - return new Tuple2<>("k_" + x, x); - } - } - - @Test - public void testRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws Exception { - verifyRDDSaveToGemfire(true, true); - } - - @Test - public void testRDDSaveToGemfireWithDefaultConnConf() throws Exception { - verifyRDDSaveToGemfire(true, false); - } - - @Test - public void testRDDSaveToGemfireWithConnConfAndOpConf() throws Exception { - verifyRDDSaveToGemfire(false, true); - } - - @Test - public void testRDDSaveToGemfireWithConnConf() throws Exception { - verifyRDDSaveToGemfire(false, false); - } - - public void verifyRDDSaveToGemfire(boolean useDefaultConnConf, boolean useOpConf) throws Exception { - Region region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries - JavaRDD rdd1 = prepareIntJavaRDD(0, numObjects); - - PairFunction func = new IntToStrIntPairFunction(); - Properties opConf = new Properties(); - opConf.put(RDDSaveBatchSizePropKey, "200"); - - if (useDefaultConnConf) { - if (useOpConf) - javaFunctions(rdd1).saveToGemfire(regionPath, func, opConf); - else - javaFunctions(rdd1).saveToGemfire(regionPath, func); - } else { - if (useOpConf) - javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf, opConf); - else - javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf); - } - - Set keys = region.keySetOnServer(); - Map map = region.getAll(keys); - - List> expectedList = new ArrayList<>(); - - for (int i = 0; i < numObjects; i ++) { - expectedList.add(new Tuple2<>("k_" + i, i)); - } - matchMapAndPairList(map, expectedList); - } - - // -------------------------------------------------------------------------------------------- - // JavaPairRDD.saveToGemfire - // -------------------------------------------------------------------------------------------- - - @Test - public void testPairRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws Exception { - verifyPairRDDSaveToGemfire(true, true); - } - - @Test - public void testPairRDDSaveToGemfireWithDefaultConnConf() throws Exception { - verifyPairRDDSaveToGemfire(true, false); - } - - @Test - public void testPairRDDSaveToGemfireWithConnConfAndOpConf() throws Exception { - verifyPairRDDSaveToGemfire(false, true); - } - - @Test - public void testPairRDDSaveToGemfireWithConnConf() throws Exception { - verifyPairRDDSaveToGemfire(false, false); - } - - public void verifyPairRDDSaveToGemfire(boolean useDefaultConnConf, boolean useOpConf) throws Exception { - Region region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries - JavaPairRDD rdd1 = prepareStrIntJavaPairRDD(0, numObjects); - Properties opConf = new Properties(); - opConf.put(RDDSaveBatchSizePropKey, "200"); - - if (useDefaultConnConf) { - if (useOpConf) - javaFunctions(rdd1).saveToGemfire(regionPath, opConf); - else - javaFunctions(rdd1).saveToGemfire(regionPath); - } else { - if (useOpConf) - javaFunctions(rdd1).saveToGemfire(regionPath, connConf, opConf); - else - javaFunctions(rdd1).saveToGemfire(regionPath, connConf); - } - - Set keys = region.keySetOnServer(); - Map map = region.getAll(keys); - - List> expectedList = new ArrayList<>(); - for (int i = 0; i < numObjects; i ++) { - expectedList.add(new Tuple2<>("k_" + i, i)); - } - matchMapAndPairList(map, expectedList); - } - - // -------------------------------------------------------------------------------------------- - // JavaSparkContext.gemfireRegion and where clause - // -------------------------------------------------------------------------------------------- - - @Test - public void testJavaSparkContextGemfireRegion() throws Exception { - prepareStrIntRegion(regionPath, 0, numObjects); // remove all entries - Properties emptyProps = new Properties(); - GemFireJavaRegionRDD rdd1 = javaFunctions(jsc).gemfireRegion(regionPath); - GemFireJavaRegionRDD rdd2 = javaFunctions(jsc).gemfireRegion(regionPath, emptyProps); - GemFireJavaRegionRDD rdd3 = javaFunctions(jsc).gemfireRegion(regionPath, connConf); - GemFireJavaRegionRDD rdd4 = javaFunctions(jsc).gemfireRegion(regionPath, connConf, emptyProps); - GemFireJavaRegionRDD rdd5 = rdd1.where("value.intValue() < 50"); - - HashMap expectedMap = new HashMap<>(); - for (int i = 0; i < numObjects; i ++) { - expectedMap.put("k_" + i, i); - } - - matchMapAndPairList(expectedMap, rdd1.collect()); - matchMapAndPairList(expectedMap, rdd2.collect()); - matchMapAndPairList(expectedMap, rdd3.collect()); - matchMapAndPairList(expectedMap, rdd4.collect()); - - HashMap expectedMap2 = new HashMap<>(); - for (int i = 0; i < 50; i ++) { - expectedMap2.put("k_" + i, i); - } - - matchMapAndPairList(expectedMap2, rdd5.collect()); - } - - // -------------------------------------------------------------------------------------------- - // JavaPairRDD.joinGemfireRegion - // -------------------------------------------------------------------------------------------- - - @Test - public void testPairRDDJoinWithSameKeyType() throws Exception { - prepareStrIntRegion(regionPath, 0, numObjects); - JavaPairRDD rdd1 = prepareStrIntJavaPairRDD(-5, 10); - - JavaPairRDD, Integer> rdd2a = javaFunctions(rdd1).joinGemfireRegion(regionPath); - JavaPairRDD, Integer> rdd2b = javaFunctions(rdd1).joinGemfireRegion(regionPath, connConf); - // System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); - - HashMap, Integer> expectedMap = new HashMap<>(); - for (int i = 0; i < 10; i ++) { - expectedMap.put(new Tuple2<>("k_" + i, i), i); - } - matchMapAndPairList(expectedMap, rdd2a.collect()); - matchMapAndPairList(expectedMap, rdd2b.collect()); - } - - static class IntIntPairToStrKeyFunction implements Function, String> { - @Override public String call(Tuple2 pair) throws Exception { - return "k_" + pair._1(); - } - } - - @Test - public void testPairRDDJoinWithDiffKeyType() throws Exception { - prepareStrIntRegion(regionPath, 0, numObjects); - JavaPairRDD rdd1 = prepareIntIntJavaPairRDD(-5, 10); - Function, String> func = new IntIntPairToStrKeyFunction(); - - JavaPairRDD, Integer> rdd2a = javaFunctions(rdd1).joinGemfireRegion(regionPath, func); - JavaPairRDD, Integer> rdd2b = javaFunctions(rdd1).joinGemfireRegion(regionPath, func, connConf); - //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); - - HashMap, Integer> expectedMap = new HashMap<>(); - for (int i = 0; i < 10; i ++) { - expectedMap.put(new Tuple2<>(i, i * 2), i); - } - matchMapAndPairList(expectedMap, rdd2a.collect()); - matchMapAndPairList(expectedMap, rdd2b.collect()); - } - - // -------------------------------------------------------------------------------------------- - // JavaPairRDD.outerJoinGemfireRegion - // -------------------------------------------------------------------------------------------- - - @Test - public void testPairRDDOuterJoinWithSameKeyType() throws Exception { - prepareStrIntRegion(regionPath, 0, numObjects); - JavaPairRDD rdd1 = prepareStrIntJavaPairRDD(-5, 10); - - JavaPairRDD, Option> rdd2a = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath); - JavaPairRDD, Option> rdd2b = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, connConf); - //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); - - HashMap, Option> expectedMap = new HashMap<>(); - for (int i = -5; i < 10; i ++) { - if (i < 0) - expectedMap.put(new Tuple2<>("k_" + i, i), Option.apply((Integer) null)); - else - expectedMap.put(new Tuple2<>("k_" + i, i), Some.apply(i)); - } - matchMapAndPairList(expectedMap, rdd2a.collect()); - matchMapAndPairList(expectedMap, rdd2b.collect()); - } - - @Test - public void testPairRDDOuterJoinWithDiffKeyType() throws Exception { - prepareStrIntRegion(regionPath, 0, numObjects); - JavaPairRDD rdd1 = prepareIntIntJavaPairRDD(-5, 10); - Function, String> func = new IntIntPairToStrKeyFunction(); - - JavaPairRDD, Option> rdd2a = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func); - JavaPairRDD, Option> rdd2b = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func, connConf); - //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); - - HashMap, Option> expectedMap = new HashMap<>(); - for (int i = -5; i < 10; i ++) { - if (i < 0) - expectedMap.put(new Tuple2<>(i, i * 2), Option.apply((Integer) null)); - else - expectedMap.put(new Tuple2<>(i, i * 2), Some.apply(i)); - } - matchMapAndPairList(expectedMap, rdd2a.collect()); - matchMapAndPairList(expectedMap, rdd2b.collect()); - } - - // -------------------------------------------------------------------------------------------- - // JavaRDD.joinGemfireRegion - // -------------------------------------------------------------------------------------------- - - static class IntToStrKeyFunction implements Function { - @Override public String call(Integer x) throws Exception { - return "k_" + x; - } - } - - @Test - public void testRDDJoinWithSameKeyType() throws Exception { - prepareStrIntRegion(regionPath, 0, numObjects); - JavaRDD rdd1 = prepareIntJavaRDD(-5, 10); - - Function func = new IntToStrKeyFunction(); - JavaPairRDD rdd2a = javaFunctions(rdd1).joinGemfireRegion(regionPath, func); - JavaPairRDD rdd2b = javaFunctions(rdd1).joinGemfireRegion(regionPath, func, connConf); - //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); - - HashMap expectedMap = new HashMap<>(); - for (int i = 0; i < 10; i ++) { - expectedMap.put(i, i); - } - matchMapAndPairList(expectedMap, rdd2a.collect()); - matchMapAndPairList(expectedMap, rdd2b.collect()); - } - - // -------------------------------------------------------------------------------------------- - // JavaRDD.outerJoinGemfireRegion - // -------------------------------------------------------------------------------------------- - - @Test - public void testRDDOuterJoinWithSameKeyType() throws Exception { - prepareStrIntRegion(regionPath, 0, numObjects); - JavaRDD rdd1 = prepareIntJavaRDD(-5, 10); - - Function func = new IntToStrKeyFunction(); - JavaPairRDD> rdd2a = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func); - JavaPairRDD> rdd2b = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func, connConf); - //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); - - HashMap> expectedMap = new HashMap<>(); - for (int i = -5; i < 10; i ++) { - if (i < 0) - expectedMap.put(i, Option.apply((Integer) null)); - else - expectedMap.put(i, Some.apply(i)); - } - matchMapAndPairList(expectedMap, rdd2a.collect()); - matchMapAndPairList(expectedMap, rdd2b.collect()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java deleted file mode 100644 index 5fa03c6..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark.connector; - -import java.io.Serializable; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Properties; -import com.gemstone.gemfire.cache.Declarable; - -/** - * A stock portfolio that consists of multiple {@link Position} objects that - * represent shares of stock (a "security"). Instances of - * Portfolio can be stored in a GemFire Region and - * their contents can be queried using the GemFire query service. - *

- * This class is Serializable because we want it to be distributed - * to multiple members of a distributed system. Because this class is - * Declarable, we can describe instances of it in a GemFire - * cache.xml file. - *

- * - */ -public class Portfolio implements Declarable, Serializable { - - private static final long serialVersionUID = 9097335119586059309L; - - private int id; /* id is used as the entry key and is stored in the entry */ - private String type; - private Map positions = new LinkedHashMap(); - private String status; - - public Portfolio(Properties props) { - init(props); - } - - @Override - public void init(Properties props) { - this.id = Integer.parseInt(props.getProperty("id")); - this.type = props.getProperty("type", "type1"); - this.status = props.getProperty("status", "active"); - - // get the positions. These are stored in the properties object - // as Positions, not String, so use Hashtable protocol to get at them. - // the keys are named "positionN", where N is an integer. - for (Map.Entry entry: props.entrySet()) { - String key = (String)entry.getKey(); - if (key.startsWith("position")) { - Position pos = (Position)entry.getValue(); - this.positions.put(pos.getSecId(), pos); - } - } - } - - public void setType(String t) {this.type = t; } - - public String getStatus(){ - return status; - } - - public int getId(){ - return this.id; - } - - public Map getPositions(){ - return this.positions; - } - - public String getType() { - return this.type; - } - - public boolean isActive(){ - return status.equals("active"); - } - - @Override - public String toString(){ - StringBuilder buf = new StringBuilder(); - buf.append("\n\tPortfolio [id=" + this.id + " status=" + this.status); - buf.append(" type=" + this.type); - boolean firstTime = true; - for (Map.Entry entry: positions.entrySet()) { - if (!firstTime) { - buf.append(", "); - } - buf.append("\n\t\t"); - buf.append(entry.getKey() + ":" + entry.getValue()); - firstTime = false; - } - buf.append("]"); - return buf.toString(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java deleted file mode 100644 index b8e8be9..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark.connector; - -import java.io.Serializable; -import java.util.Properties; -import com.gemstone.gemfire.cache.Declarable; - -/** - * Represents a number of shares of a stock ("security") held in a {@link - * Portfolio}. - *

- * This class is Serializable because we want it to be distributed - * to multiple members of a distributed system. Because this class is - * Declarable, we can describe instances of it in a GemFire - * cache.xml file. - *

- * - */ -public class Position implements Declarable, Serializable { - - private static final long serialVersionUID = -8229531542107983344L; - - private String secId; - private double qty; - private double mktValue; - - public Position(Properties props) { - init(props); - } - - @Override - public void init(Properties props) { - this.secId = props.getProperty("secId"); - this.qty = Double.parseDouble(props.getProperty("qty")); - this.mktValue = Double.parseDouble(props.getProperty("mktValue")); - } - - public String getSecId(){ - return this.secId; - } - - public double getQty(){ - return this.qty; - } - - public double getMktValue() { - return this.mktValue; - } - - @Override - public String toString(){ - return new StringBuilder() - .append("Position [secId=").append(secId) - .append(" qty=").append(this.qty) - .append(" mktValue=").append(mktValue).append("]").toString(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml deleted file mode 100644 index 79893d6..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml +++ /dev/null @@ -1,49 +0,0 @@ - - - - - - - - - - - - - - java.lang.String - java.lang.Integer - - - - - - java.lang.String - java.lang.String - - - - - - java.lang.String - java.lang.String - - - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml deleted file mode 100644 index 3023959..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml +++ /dev/null @@ -1,57 +0,0 @@ - - - - - - - - - - - - - - - java.lang.String - java.lang.Integer - - - - - - java.lang.String - java.lang.Integer - - - - - - java.lang.String - java.lang.Integer - - - - - - java.lang.String - java.lang.Integer - - - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala deleted file mode 100644 index 10c7eaf..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala +++ /dev/null @@ -1,598 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark.connector - -import java.util.Properties -import com.gemstone.gemfire.cache.query.QueryService -import com.gemstone.gemfire.cache.query.internal.StructImpl -import io.pivotal.gemfire.spark.connector._ -import com.gemstone.gemfire.cache.Region -import io.pivotal.gemfire.spark.connector.internal.{RegionMetadata, DefaultGemFireConnectionManager} -import io.pivotal.gemfire.spark.connector.internal.oql.{RDDConverter, QueryRDD} -import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster -import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils -import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream} -import org.apache.spark.{SparkContext, SparkConf} -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import scala.collection.JavaConversions -import scala.reflect.ClassTag - -case class Number(str: String, len: Int) - -class BasicIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GemFireCluster { - - var sc: SparkContext = null - - override def beforeAll() { - // start gemfire cluster, and spark context - val settings = new Properties() - settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml") - settings.setProperty("num-of-servers", "2") - val locatorPort = GemFireCluster.start(settings) - - // start spark context in local mode - IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", - "log4j.logger.io.pivotal.gemfire.spark.connector" -> "DEBUG") - val conf = new SparkConf() - .setAppName("BasicIntegrationTest") - .setMaster("local[2]") - .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") - .set(GemFireLocatorPropKey, s"localhost[$locatorPort]") - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator") - - sc = new SparkContext(conf) - } - - override def afterAll() { - // stop connection, spark context, and gemfire cluster - DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf)) - sc.stop() - GemFireCluster.stop() - } - - //Convert Map[Object, Object] to java.util.Properties - private def map2Props(map: Map[Object, Object]): java.util.Properties = - (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props} - - // =========================================================== - // DefaultGemFireConnection functional tests - // =========================================================== - - test("DefaultGemFireConnection.validateRegion()") { - val conn = GemFireConnectionConf(sc.getConf).getConnection - - // normal exist-region - var regionPath: String = "str_str_region" - conn.validateRegion[String, String](regionPath) - - // non-exist region - regionPath = "non_exist_region" - try { - conn.validateRegion[String, String](regionPath) - fail("validateRegion failed to catch non-exist region error") - } catch { - case e: RuntimeException => - if (! e.getMessage.contains(s"The region named $regionPath was not found")) - fail("validateRegion gives wrong exception on non-exist region", e) - case e: Throwable => - fail("validateRegion gives wrong exception on non-exist region", e) - } - - // Note: currently, can't catch type mismatch error - conn.validateRegion[String, Integer]("str_str_region") - } - - test("DefaultGemFireConnection.getRegionMetadata()") { - val conn = GemFireConnectionConf(sc.getConf).getConnection - - // exist region - validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, false) - validateRegionMetadata(conn, "str_int_region", true, 113, "java.lang.String", "java.lang.Integer", false) - validateRegionMetadata(conn, "str_str_rep_region", false, 0, "java.lang.String", "java.lang.String", true) - - // non-exist region - assert(! conn.getRegionMetadata("no_exist_region").isDefined) - } - - def validateRegionMetadata( - conn: GemFireConnection, regionPath: String, partitioned: Boolean, buckets: Int, - keyType: String, valueType: String, emptyMap: Boolean): Unit = { - - val mdOption = conn.getRegionMetadata(regionPath) - val md = mdOption.get - - assert(md.getRegionPath == s"/$regionPath") - assert(md.isPartitioned == partitioned) - assert(md.getKeyTypeName == keyType) - assert(md.getValueTypeName == valueType) - assert(md.getTotalBuckets == buckets) - if (emptyMap) assert(md.getServerBucketMap == null) - else assert(md.getServerBucketMap != null) - } - - test("DefaultGemFireConnection.getRegionProxy()") { - val conn = GemFireConnectionConf(sc.getConf).getConnection - - val region1 = conn.getRegionProxy[String, String]("str_str_region") - region1.put("1", "One") - assert(region1.get("1") == "One") - region1.remove("1") - assert(region1.get("1") == null) - - // getRegionProxy doesn't fail when region doesn't exist - val region2 = conn.getRegionProxy[String, String]("non_exist_region") - try { - region2.put("1", "One") - fail("getRegionProxy failed to catch non-exist region error") - } catch { - case e: Exception => - if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region named /non_exist_region was not found")) { - e.printStackTrace() - fail("validateRegion gives wrong exception on non-exist region", e) - } - } - } - - // Note: DefaultGemFireConnecton.getQuery() and getRegionData() are covered by - // RetrieveRegionIntegrationTest.scala and following OQL tests. - - // =========================================================== - // OQL functional tests - // =========================================================== - - private def initRegion(regionName: String): Unit = { - - //Populate some data in the region - val conn = GemFireConnectionConf(sc.getConf).getConnection - val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) - rgn.removeAll(rgn.keySetOnServer()) - - //This will call the implicit conversion map2Properties in connector package object, since it is Map[String, String] - var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", "mktValue" -> "24.42")) - var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", "mktValue" -> "34.29")) - val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> "type1", "status" -> "active", - "position1" -> position1, "position2" -> position2))) - rgn.put("1", portfolio1) - - position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", "mktValue" -> "12.925")) - position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", "mktValue" -> "21.972")) - val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> "type2", "status" -> "inactive", - "position1" -> position1, "position2" -> position2))) - rgn.put("2", portfolio2) - - position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32")) - position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373")) - val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> "type3", "status" -> "active", - "position1" -> position1, "position2" -> position2))) - rgn.put("3", portfolio3) - - position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" -> "67.356572")) - position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" -> "101.34")) - val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> "type1", "status" -> "inactive", - "position1" -> position1, "position2" -> position2))) - rgn.put("4", portfolio4) - - position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" -> "67.356572")) - position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" -> "101.34")) - val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> "type2", "status" -> "active", - "position1" -> position1, "position2" -> position2))) - rgn.put("5", portfolio5) - - position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" -> "67.356572")) - position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" -> "101.34")) - val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> "type3", "status" -> "inactive", - "position1" -> position1, "position2" -> position2))) - rgn.put("6", portfolio6) - - position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32")) - position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373")) - val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> "type4", "status" -> "active", - "position1" -> position1, "position2" -> position2))) - //Not using null, due to intermittent query failure on column containing null, likely a Spark SQL bug - //portfolio7.setType(null) - rgn.put("7", portfolio7) - } - - private def getQueryRDD[T: ClassTag]( - query: String, connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)): QueryRDD[T] = - new QueryRDD[T](sc, query, connConf) - - test("Run GemFire OQL query and convert the returned QueryRDD to DataFrame: Partitioned Region") { - simpleQuery("obj_obj_region") - } - - test("Run GemFire OQL query and convert the returned QueryRDD to DataFrame: Replicated Region") { - simpleQuery("obj_obj_rep_region") - } - - private def simpleQuery(regionName: String) { - //Populate some data in the region - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[String, String] = conn.getRegionProxy(regionName) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three"))) - - //Create QueryRDD using OQL - val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from /$regionName") - - //verify the QueryRDD - val oqlRS: Array[String] = OQLResult.collect() - oqlRS should have length 3 - oqlRS should contain theSameElementsAs List("one", "two", "three") - - //Convert QueryRDD to DataFrame - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - // this is used to implicitly convert an RDD to a DataFrame. - import sqlContext.implicits._ - val dataFrame = OQLResult.map(x => Number(x, x.length)).toDF() - //Register dataFrame as a table of two columns of type String and Int respectively - dataFrame.registerTempTable("numberTable") - - //Issue SQL query against the table - val SQLResult = sqlContext.sql("SELECT * FROM numberTable") - //Verify the SQL query result, r(0) mean column 0 - val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect() - sqlRS should have length 3 - sqlRS should contain theSameElementsAs List("one", "two", "three") - - //Convert QueryRDD to DataFrame using RDDConverter - val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) - //Register dataFrame2 as a table of two columns of type String and Int respectively - dataFrame2.registerTempTable("numberTable2") - - //Issue SQL query against the table - val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2") - //Verify the SQL query result, r(0) mean column 0 - val sqlRS2: Array[Any] = SQLResult2.map(r => r(0)).collect() - sqlRS2 should have length 3 - sqlRS2 should contain theSameElementsAs List("one", "two", "three") - - //Remove the region entries, because other tests might use the same region as well - List("1", "2", "3").foreach(rgn.remove) - } - - test("Run GemFire OQL query and directly return DataFrame: Partitioned Region") { - simpleQueryDataFrame("obj_obj_region") - } - - test("Run GemFire OQL query and directly return DataFrame: Replicated Region") { - simpleQueryDataFrame("obj_obj_rep_region") - } - - private def simpleQueryDataFrame(regionName: String) { - //Populate some data in the region - val conn = GemFireConnectionConf(sc.getConf).getConnection - val rgn: Region[String, String] = conn.getRegionProxy(regionName) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three"))) - - //Create DataFrame using GemFire OQL - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - val dataFrame = sqlContext.gemfireOQL(s"select * from /$regionName") - dataFrame.registerTempTable("numberTable") - - //Issue SQL query against the table - val SQLResult = sqlContext.sql("SELECT * FROM numberTable") - //Verify the SQL query result, r(0) mean column 0 - val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect() - sqlRS should have length 3 - sqlRS should contain theSameElementsAs List("one", "two", "three") - - //Remove the region entries, because other tests might use the same region as well - List("1", "2", "3").foreach(rgn.remove) - } - - test("GemFire OQL query with UDT: Partitioned Region") { - queryUDT("obj_obj_region") - } - - test("GemFire OQL query with UDT: Replicated Region") { - queryUDT("obj_obj_rep_region") - } - - private def queryUDT(regionName: String) { - - //Populate some data in the region - val conn = GemFireConnectionConf(sc.getConf).getConnection - val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) - rgn.removeAll(rgn.keySetOnServer()) - val e1: Employee = new Employee("hello", 123) - val e2: Employee = new Employee("world", 456) - rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2))) - - //Create QueryRDD using OQL - val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from /$regionName") - - //verify the QueryRDD - val oqlRS: Array[Object] = OQLResult.collect() - oqlRS should have length 2 - oqlRS.map(e => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should contain theSameElementsAs List(123, 456) - - //Convert QueryRDD to DataFrame - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - - //Convert QueryRDD to DataFrame using RDDConverter - val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) - dataFrame.registerTempTable("employee") - val SQLResult = sqlContext.sql("SELECT * FROM employee") - - //Verify the SQL query result - val sqlRS = SQLResult.map(r => r(0)).collect() - sqlRS should have length 2 - sqlRS should contain theSameElementsAs List("hello", "world") - - List("1", "2").foreach(rgn.remove) - } - - test("GemFire OQL query with UDT and directly return DataFrame: Partitioned Region") { - queryUDTDataFrame("obj_obj_region") - } - - test("GemFire OQL query with UDT and directly return DataFrame: Replicated Region") { - queryUDTDataFrame("obj_obj_rep_region") - } - - private def queryUDTDataFrame(regionName: String) { - //Populate some data in the region - val conn = GemFireConnectionConf(sc.getConf).getConnection - val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) - rgn.removeAll(rgn.keySetOnServer()) - val e1: Employee = new Employee("hello", 123) - val e2: Employee = new Employee("world", 456) - rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2))) - - //Create DataFrame using GemFire OQL - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - val dataFrame = sqlContext.gemfireOQL(s"select name, age from /$regionName") - - dataFrame.registerTempTable("employee") - val SQLResult = sqlContext.sql("SELECT * FROM employee") - - //Verify the SQL query result - val sqlRS = SQLResult.map(r => r(0)).collect() - sqlRS should have length 2 - sqlRS should contain theSameElementsAs List("hello", "world") - - List("1", "2").foreach(rgn.remove) - } - - test("GemFire OQL query with more complex UDT: Partitioned Region") { - complexUDT("obj_obj_region") - } - - test("GemFire OQL query with more complex UDT: Replicated Region") { - complexUDT("obj_obj_rep_region") - } - - private def complexUDT(regionName: String) { - - initRegion(regionName) - - //Create QueryRDD using OQL - val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'") - - //verify the QueryRDD - val oqlRS: Array[Int] = OQLResult.collect().map(r => r.asInstanceOf[Portfolio].getId) - oqlRS should contain theSameElementsAs List(1, 3, 5, 7) - - //Convert QueryRDD to DataFrame - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - - //Convert QueryRDD to DataFrame using RDDConverter - val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) - - dataFrame.registerTempTable("Portfolio") - - val SQLResult = sqlContext.sql("SELECT * FROM Portfolio") - - //Verify the SQL query result - val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType) - sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4") - } - - test("GemFire OQL query with more complex UDT and directly return DataFrame: Partitioned Region") { - complexUDTDataFrame("obj_obj_region") - } - - test("GemFire OQL query with more complex UDT and directly return DataFrame: Replicated Region") { - complexUDTDataFrame("obj_obj_rep_region") - } - - private def complexUDTDataFrame(regionName: String) { - - initRegion(regionName) - - //Create DataFrame using GemFire OQL - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - val dataFrame = sqlContext.gemfireOQL(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'") - dataFrame.registerTempTable("Portfolio") - - val SQLResult = sqlContext.sql("SELECT * FROM Portfolio") - - //Verify the SQL query result - val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType) - sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4") - } - - test("GemFire OQL query with more complex UDT with Projection: Partitioned Region") { - queryComplexUDTProjection("obj_obj_region") - } - - test("GemFire OQL query with more complex UDT with Projection: Replicated Region") { - queryComplexUDTProjection("obj_obj_rep_region") - } - - private def queryComplexUDTProjection(regionName: String) { - - initRegion(regionName) - - //Create QueryRDD using OQL - val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""") - - //verify the QueryRDD - val oqlRS: Array[Int] = OQLResult.collect().map(si => si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int]) - oqlRS should contain theSameElementsAs List(1, 3, 5, 7) - - //Convert QueryRDD to DataFrame - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - - //Convert QueryRDD to DataFrame using RDDConverter - val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) - - dataFrame.registerTempTable("Portfolio") - - val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") - - //Verify the SQL query result - val sqlRS = SQLResult.collect().map(r => r(0)) - sqlRS should contain theSameElementsAs List(3) - } - - test("GemFire OQL query with more complex UDT with Projection and directly return DataFrame: Partitioned Region") { - queryComplexUDTProjectionDataFrame("obj_obj_region") - } - - test("GemFire OQL query with more complex UDT with Projection and directly return DataFrame: Replicated Region") { - queryComplexUDTProjectionDataFrame("obj_obj_rep_region") - } - - private def queryComplexUDTProjectionDataFrame(regionName: String) { - - initRegion(regionName) - - //Create DataFrame using GemFire OQL - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - val dataFrame = sqlContext.gemfireOQL(s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""") - dataFrame.registerTempTable("Portfolio") - - val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") - - //Verify the SQL query result - val sqlRS = SQLResult.collect().map(r => r(0)) - sqlRS should contain theSameElementsAs List(3) - } - - test("GemFire OQL query with more complex UDT with nested Projection and directly return DataFrame: Partitioned Region") { - queryComplexUDTNestProjectionDataFrame("obj_obj_region") - } - - test("GemFire OQL query with more complex UDT with nested Projection and directly return DataFrame: Replicated Region") { - queryComplexUDTNestProjectionDataFrame("obj_obj_rep_region") - } - - private def queryComplexUDTNestProjectionDataFrame(regionName: String) { - - initRegion(regionName) - - //Create DataFrame using GemFire OQL - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - val dataFrame = sqlContext.gemfireOQL(s"""SELECT r.id, r."type", r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'""") - dataFrame.registerTempTable("Portfolio") - - val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") - - //Verify the SQL query result - val sqlRS = SQLResult.collect().map(r => r(0)) - sqlRS should contain theSameElementsAs List(3) - } - - test("Undefined instance deserialization: Partitioned Region") { - undefinedInstanceDeserialization("obj_obj_region") - } - - test("Undefined instance deserialization: Replicated Region") { - undefinedInstanceDeserialization("obj_obj_rep_region") - } - - private def undefinedInstanceDeserialization(regionName: String) { - - val conn = GemFireConnectionConf(sc.getConf).getConnection - val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) - rgn.removeAll(rgn.keySetOnServer()) - - //Put some new data - rgn.put("1", "one") - - //Query some non-existent columns, which should return UNDEFINED - val sqlContext = new org.apache.spark.sql.SQLContext(sc) - val dataFrame = sqlContext.gemfireOQL(s"SELECT col100, col200 FROM /$regionName") - val col1 = dataFrame.first().apply(0) - val col2 = dataFrame.first().apply(1) - assert(col1 == QueryService.UNDEFINED) - assert(col2 == QueryService.UNDEFINED) - //Verify that col1 and col2 refer to the same Undefined object - assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef]) - } - - test("RDD.saveToGemFire") { - val regionName = "str_str_region" - // generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66)) - val data = (1 to 6).map(_.toString).map(e=> (e, e*2)) - val rdd = sc.parallelize(data) - rdd.saveToGemfire(regionName) - - // verify - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val region: Region[String, String] = connConf.getConnection.getRegionProxy(regionName) - println("region key set on server: " + region.keySetOnServer()) - assert((1 to 6).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer())) - (1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e))) - } - - // =========================================================== - // DStream.saveToGemfire() functional tests - // =========================================================== - - test("Basic DStream test") { - import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener} - import io.pivotal.gemfire.spark.connector.streaming._ - import org.apache.spark.streaming.ManualClockHelper - - class TestStreamListener extends StreamingListener { - var count = 0 - override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = count += 1 - } - - def batchDuration = Seconds(1) - val ssc = new StreamingContext(sc, batchDuration) - val input = Seq(1 to 4, 5 to 8, 9 to 12) - val dstream = new TestInputDStream(ssc, input, 2) - dstream.saveToGemfire[String, Int]("str_int_region", (e: Int) => (e.toString, e)) - try { - val listener = new TestStreamListener - ssc.addStreamingListener(listener) - ssc.start() - ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * input.length) - while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50) - } catch { - case e: Exception => e.printStackTrace(); throw e -// } finally { -// ssc.stop() - } - - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val region: Region[String, Int] = conn.getRegionProxy("str_int_region") - - // verify gemfire region contents - println("region key set on server: " + region.keySetOnServer()) - assert((1 to 12).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer())) - (1 to 12).foreach(e => assert(e == region.get(e.toString))) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala deleted file mode 100644 index c286491..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark.connector - -import java.util.Properties - -import io.pivotal.gemfire.spark.connector._ -import com.gemstone.gemfire.cache.Region -import io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager -import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster -import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils -import org.apache.spark.{SparkContext, SparkConf} -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import java.util.{HashMap => JHashMap} - -class RDDJoinRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GemFireCluster { - - var sc: SparkContext = null - val numServers = 3 - val numObjects = 1000 - - override def beforeAll() { - // start gemfire cluster, and spark context - val settings = new Properties() - settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml") - settings.setProperty("num-of-servers", numServers.toString) - val locatorPort = GemFireCluster.start(settings) - - // start spark context in local mode - IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", - "log4j.logger.io.pivotal.gemfire.spark.connector" -> "DEBUG") - val conf = new SparkConf() - .setAppName("RDDJoinRegionIntegrationTest") - .setMaster("local[2]") - .set(GemFireLocatorPropKey, s"localhost[$locatorPort]") - sc = new SparkContext(conf) - } - - override def afterAll() { - // stop connection, spark context, and gemfire cluster - DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf)) - sc.stop() - GemFireCluster.stop() - } - -// def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = { -// assert(map1.size == map2.size) -// map1.foreach(e => { -// assert(map2.contains(e._1)) -// assert (e._2 == map2.get(e._1).get) -// }) -// } - - // -------------------------------------------------------------------------------------------- - // PairRDD.joinGemfireRegion[K2 <: K, V2](regionPath, connConf): GemFireJoinRDD[(K, V), K, V2] - // -------------------------------------------------------------------------------------------- - - test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], replicated region", JoinTest) { - verifyPairRDDJoinRegionWithSameKeyType("rr_str_int_region") - } - - test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned region", JoinTest) { - verifyPairRDDJoinRegionWithSameKeyType("pr_str_int_region") - } - - test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", JoinTest) { - verifyPairRDDJoinRegionWithSameKeyType("pr_r_str_int_region") - } - - def verifyPairRDDJoinRegionWithSameKeyType(regionPath: String): Unit = { - val entriesMap: JHashMap[String, Int] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) - - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(entriesMap) - - val data = (-5 until 50).map(x => ("k_" + x, x*2)) - val rdd = sc.parallelize(data) - - val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, connConf) - val rdd2Content = rdd2.collect() - - val expectedMap = (0 until 50).map(i => ((s"k_$i", i*2), i)).toMap - // matchMaps[(String, Int), Int](expectedMap, rdd2Content.toMap) - assert(expectedMap == rdd2Content.toMap) - } - - // ------------------------------------------------------------------------------------------------------ - // PairRDD.joinGemfireRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GemFireJoinRDD[(K, V), K2, V2] - // ------------------------------------------------------------------------------------------------------- - - test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], replicated region", JoinTest) { - verifyPairRDDJoinRegionWithDiffKeyType("rr_str_int_region") - } - - test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned region", JoinTest) { - verifyPairRDDJoinRegionWithDiffKeyType("pr_str_int_region") - } - - test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", JoinTest) { - verifyPairRDDJoinRegionWithDiffKeyType("pr_r_str_int_region") - } - - def verifyPairRDDJoinRegionWithDiffKeyType(regionPath: String): Unit = { - val entriesMap: JHashMap[String, Int] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) - - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(entriesMap) - - val data = (-5 until 50).map(x => (x, x*2)) - val rdd = sc.parallelize(data) - - val func :((Int, Int)) => String = pair => s"k_${pair._1}" - - val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, func /*, connConf*/) - val rdd2Content = rdd2.collect() - - val expectedMap = (0 until 50).map(i => ((i, i*2), i)).toMap - // matchMaps[(Int, Int), Int](expectedMap, rdd2Content.toMap) - assert(expectedMap == rdd2Content.toMap) - } - - // ------------------------------------------------------------------------------------------------ - // PairRDD.outerJoinGemfireRegion[K2 <: K, V2](regionPath, connConf): GemFireJoinRDD[(K, V), K, V2] - // ------------------------------------------------------------------------------------------------ - - test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], replicated region", OuterJoinTest) { - verifyPairRDDOuterJoinRegionWithSameKeyType("rr_str_int_region") - } - - test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned region", OuterJoinTest) { - verifyPairRDDOuterJoinRegionWithSameKeyType("pr_str_int_region") - } - - test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", OuterJoinTest) { - verifyPairRDDOuterJoinRegionWithSameKeyType("pr_r_str_int_region") - } - - def verifyPairRDDOuterJoinRegionWithSameKeyType(regionPath: String): Unit = { - val entriesMap: JHashMap[String, Int] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) - - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(entriesMap) - - val data = (-5 until 50).map(x => ("k_" + x, x*2)) - val rdd = sc.parallelize(data) - - val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath /*, connConf*/) - val rdd2Content = rdd2.collect() - - val expectedMap = (-5 until 50).map { - i => if (i < 0) ((s"k_$i", i * 2), None) - else ((s"k_$i", i*2), Some(i))}.toMap - // matchMaps[(String, Int), Option[Int]](expectedMap, rdd2Content.toMap) - assert(expectedMap == rdd2Content.toMap) - } - - // ------------------------------------------------------------------------------------------------------ - // PairRDD.joinGemfireRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GemFireJoinRDD[(K, V), K2, V2] - // ------------------------------------------------------------------------------------------------------- - - test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], replicated region", OuterJoinTest) { - verifyPairRDDOuterJoinRegionWithDiffKeyType("rr_str_int_region") - } - - test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned region", OuterJoinTest) { - verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_str_int_region") - } - - test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", OuterJoinTest) { - verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_r_str_int_region") - } - - def verifyPairRDDOuterJoinRegionWithDiffKeyType(regionPath: String): Unit = { - val entriesMap: JHashMap[String, Int] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) - - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(entriesMap) - - val data = (-5 until 50).map(x => (x, x*2)) - val rdd = sc.parallelize(data) - - val func :((Int, Int)) => String = pair => s"k_${pair._1}" - - val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath, func, connConf) - val rdd2Content = rdd2.collect() - - val expectedMap = (-5 until 50).map { - i => if (i < 0) ((i, i * 2), None) - else ((i, i*2), Some(i))}.toMap - // matchMaps[(Int, Int), Option[Int]](expectedMap, rdd2Content.toMap) - assert(expectedMap == rdd2Content.toMap) - } - - // -------------------------------------------------------------------------------------------- - // RDD.joinGemfireRegion[K, V](regionPath, T => K, connConf): GemFireJoinRDD[T, K, V] - // -------------------------------------------------------------------------------------------- - - test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], replicated region", JoinTest) { - verifyRDDJoinRegion("rr_str_int_region") - } - - test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], partitioned region", JoinTest) { - verifyRDDJoinRegion("pr_str_int_region") - } - - test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], partitioned redundant region", JoinTest) { - verifyRDDJoinRegion("pr_r_str_int_region") - } - - def verifyRDDJoinRegion(regionPath: String): Unit = { - val entriesMap: JHashMap[String, Int] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) - - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(entriesMap) - - val data = (-5 until 50).map(x => s"k_$x") - val rdd = sc.parallelize(data) - - val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, x => x, connConf) - val rdd2Content = rdd2.collect() - - val expectedMap = (0 until 50).map(i => (s"k_$i", i)).toMap - // matchMaps[String, Int](expectedMap, rdd2Content.toMap) - assert(expectedMap == rdd2Content.toMap) - } - - // -------------------------------------------------------------------------------------------- - // RDD.outerJoinGemfireRegion[K, V](regionPath, T => K, connConf): GemFireJoinRDD[T, K, V] - // -------------------------------------------------------------------------------------------- - - test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], replicated region", OnlyTest) { - verifyRDDOuterJoinRegion("rr_str_int_region") - } - - test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], partitioned region", OnlyTest) { - verifyRDDOuterJoinRegion("pr_str_int_region") - } - - test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], partitioned redundant region", OnlyTest) { - verifyRDDOuterJoinRegion("pr_r_str_int_region") - } - - def verifyRDDOuterJoinRegion(regionPath: String): Unit = { - val entriesMap: JHashMap[String, Int] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) - - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(entriesMap) - - val data = (-5 until 50).map(x => s"k_$x") - val rdd = sc.parallelize(data) - - val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath, x => x /*, connConf */) - val rdd2Content = rdd2.collect() - - val expectedMap = (-5 until 50).map { - i => if (i < 0) (s"k_$i", None) - else (s"k_$i", Some(i))}.toMap - // matchMaps[String, Option[Int]](expectedMap, rdd2Content.toMap) - assert(expectedMap == rdd2Content.toMap) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala deleted file mode 100644 index 0ab8110..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark.connector - -import java.util.Properties - -import io.pivotal.gemfire.spark.connector._ -import com.gemstone.gemfire.cache.Region -import io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager -import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster -import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils -import org.apache.spark.{SparkContext, SparkConf} -import org.scalatest.{Tag, BeforeAndAfterAll, FunSuite, Matchers} -import java.util.{HashMap => JHashMap} - - -class RetrieveRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GemFireCluster { - - var sc: SparkContext = null - val numServers = 4 - val numObjects = 1000 - - override def beforeAll() { - // start gemfire cluster, and spark context - val settings = new Properties() - settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml") - settings.setProperty("num-of-servers", numServers.toString) - val locatorPort = GemFireCluster.start(settings) - - // start spark context in local mode - IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", - "log4j.logger.io.pivotal.gemfire.spark.connector" -> "DEBUG") - val conf = new SparkConf() - .setAppName("RetrieveRegionIntegrationTest") - .setMaster("local[2]") - .set(GemFireLocatorPropKey, s"localhost[$locatorPort]") - sc = new SparkContext(conf) - } - - override def afterAll() { - // stop connection, spark context, and gemfire cluster - DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf)) - sc.stop() - GemFireCluster.stop() - } - - def executeTest[K,V](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,V]) = { - //Populate some data in the region - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[K, V] = conn.getRegionProxy(regionName) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(entriesMap) - verifyRetrieveRegion[K,V](regionName, entriesMap) - } - - def verifyRetrieveRegion[K,V](regionName:String, entriesMap:java.util.Map[K,V]) = { - val rdd = sc.gemfireRegion(regionName) - val collectedObjs = rdd.collect() - collectedObjs should have length entriesMap.size - import scala.collection.JavaConverters._ - matchMaps[K,V](entriesMap.asScala.toMap, collectedObjs.toMap) - } - - def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = { - assert(map1.size == map2.size) - map1.foreach(e => { - assert(map2.contains(e._1)) - assert (e._2 == map2.get(e._1).get) - } - ) - } - - //Retrieve region for Partitioned Region where some nodes are empty (empty iterator) - //This test has to run first...the rest of the tests always use the same num objects - test("Retrieve Region for PR where some nodes are empty (Empty Iterator)") { - val numObjects = numServers - 1 - val entriesMap:JHashMap[String, Int] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("key_" + i, i)) - executeTest[String, Int]("rr_str_int_region", numObjects, entriesMap) - } - - //Test for retrieving from region containing string key and int value - def verifyRetrieveStringStringRegion(regionName:String) = { - val entriesMap:JHashMap[String, String] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("key_" + i, "value_" + i)) - executeTest[String, String](regionName, numObjects, entriesMap) - } - - test("Retrieve Region with replicate redundant string string") { - verifyRetrieveStringStringRegion("rr_obj_obj_region") - } - - test("Retrieve Region with partitioned string string") { - verifyRetrieveStringStringRegion("pr_obj_obj_region") - } - - test("Retrieve Region with partitioned redundant string string") { - verifyRetrieveStringStringRegion("pr_r_obj_obj_region") - } - - - //Test for retrieving from region containing string key and string value - def verifyRetrieveStringIntRegion(regionName:String) = { - val entriesMap:JHashMap[String, Int] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("key_" + i, i)) - executeTest[String, Int](regionName, numObjects, entriesMap) - } - - test("Retrieve Region with replicate string int region") { - verifyRetrieveStringIntRegion("rr_str_int_region") - } - - test("Retrieve Region with partitioned string int region") { - verifyRetrieveStringIntRegion("pr_str_int_region") - } - - test("Retrieve Region with partitioned redundant string int region") { - verifyRetrieveStringIntRegion("pr_r_str_int_region") - } - - //Tests for retrieving from region containing string key and object value - def verifyRetrieveStringObjectRegion(regionName:String) = { - val entriesMap:JHashMap[String, Object] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("key_" + i, new Employee("ename" + i, i))) - executeTest[String, Object](regionName, numObjects, entriesMap) - } - - test("Retrieve Region with replicate string obj") { - verifyRetrieveStringObjectRegion("rr_obj_obj_region") - } - - test("Retrieve Region with partitioned string obj") { - verifyRetrieveStringObjectRegion("pr_obj_obj_region") - } - - test("Retrieve Region with partitioned redundant string obj") { - verifyRetrieveStringObjectRegion("pr_r_obj_obj_region") - } - - //Test for retrieving from region containing string key and map value - def verifyRetrieveStringMapRegion(regionName:String) = { - val entriesMap:JHashMap[String,JHashMap[String,String]] = new JHashMap() - (0 until numObjects).map(i => { - val hashMap:JHashMap[String, String] = new JHashMap() - hashMap.put("mapKey:" + i, "mapValue:" + i) - entriesMap.put("key_" + i, hashMap) - }) - executeTest(regionName, numObjects, entriesMap) - } - - test("Retrieve Region with replicate string map region") { - verifyRetrieveStringMapRegion("rr_obj_obj_region") - } - - test("Retrieve Region with partitioned string map region") { - verifyRetrieveStringMapRegion("pr_obj_obj_region") - } - - test("Retrieve Region with partitioned redundant string map region") { - verifyRetrieveStringMapRegion("pr_r_obj_obj_region") - } - - //Test and helpers specific for retrieving from region containing string key and byte[] value - def executeTestWithByteArrayValues[K](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,Array[Byte]]) = { - //Populate some data in the region - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[K, Array[Byte]] = conn.getRegionProxy(regionName) - rgn.putAll(entriesMap) - verifyRetrieveRegionWithByteArrayValues[K](regionName, entriesMap) - } - - def verifyRetrieveRegionWithByteArrayValues[K](regionName:String, entriesMap:java.util.Map[K,Array[Byte]]) = { - val rdd = sc.gemfireRegion(regionName) - val collectedObjs = rdd.collect() - collectedObjs should have length entriesMap.size - import scala.collection.JavaConverters._ - matchByteArrayMaps[K](entriesMap.asScala.toMap, collectedObjs.toMap) - } - - def matchByteArrayMaps[K](map1:Map[K,Array[Byte]], map2:Map[K,Array[Byte]]) = { - map1.foreach(e => { - assert(map2.contains(e._1)) - assert (java.util.Arrays.equals(e._2, map2.get(e._1).get)) - } - ) - assert(map1.size == map2.size) - - } - - def verifyRetrieveStringByteArrayRegion(regionName:String) = { - val entriesMap:JHashMap[String, Array[Byte]] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("key_" + i, Array[Byte](192.toByte, 168.toByte, 0, i.toByte))) - executeTestWithByteArrayValues[String](regionName, numObjects, entriesMap) - } - - test("Retrieve Region with replicate region string byte[] region") { - verifyRetrieveStringByteArrayRegion("rr_obj_obj_region") - } - - test("Retrieve Region with partition region string byte[] region") { - verifyRetrieveStringByteArrayRegion("pr_obj_obj_region") - } - - test("Retrieve Region with partition redundant region string byte[] region") { - verifyRetrieveStringByteArrayRegion("pr_r_obj_obj_region") - } - - test("Retrieve Region with where clause on partitioned redundant region", FilterTest) { - verifyRetrieveRegionWithWhereClause("pr_r_str_int_region") - } - - test("Retrieve Region with where clause on partitioned region", FilterTest) { - verifyRetrieveRegionWithWhereClause("pr_str_int_region") - } - - test("Retrieve Region with where clause on replicated region", FilterTest) { - verifyRetrieveRegionWithWhereClause("rr_str_int_region") - } - - def verifyRetrieveRegionWithWhereClause(regionPath: String): Unit = { - val entriesMap: JHashMap[String, Int] = new JHashMap() - (0 until numObjects).map(i => entriesMap.put("key_" + i, i)) - - val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) - val conn = connConf.getConnection - val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) - rgn.removeAll(rgn.keySetOnServer()) - rgn.putAll(entriesMap) - - val rdd = sc.gemfireRegion(regionPath).where("value.intValue() < 50") - val expectedMap = (0 until 50).map(i => (s"key_$i", i)).toMap - val collectedObjs = rdd.collect() - // collectedObjs should have length expectedMap.size - matchMaps[String, Int](expectedMap, collectedObjs.toMap) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala deleted file mode 100644 index 298dc4a..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark - -import org.scalatest.Tag - -package object connector { - - object OnlyTest extends Tag("OnlyTest") - object FetchDataTest extends Tag("FetchDateTest") - object FilterTest extends Tag("FilterTest") - object JoinTest extends Tag("JoinTest") - object OuterJoinTest extends Tag("OuterJoinTest") - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala deleted file mode 100644 index d8e07f5..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark.connector.testkit - -import java.util.Properties - -trait GemFireCluster { - def startGemFireCluster(settings: Properties): Int = { - println("=== GemFireCluster start()") - GemFireCluster.start(settings) - } -} - -object GemFireCluster { - private var gemfire: Option[GemFireRunner] = None - - def start(settings: Properties): Int = { - gemfire.map(_.stopGemFireCluster()) // Clean up any old running GemFire instances - val runner = new GemFireRunner(settings) - gemfire = Some(runner) - runner.getLocatorPort - } - - def stop(): Unit = { - println("=== GemFireCluster shutdown: " + gemfire.toString) - gemfire match { - case None => println("Nothing to shutdown.") - case Some(runner) => runner.stopGemFireCluster() - } - gemfire = None - println("=== GemFireCluster shutdown finished.") - } -}