Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D90681895F for ; Mon, 6 Jul 2015 21:27:31 +0000 (UTC) Received: (qmail 34884 invoked by uid 500); 6 Jul 2015 21:27:31 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 34853 invoked by uid 500); 6 Jul 2015 21:27:31 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 34844 invoked by uid 99); 6 Jul 2015 21:27:31 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jul 2015 21:27:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 16B52D2968 for ; Mon, 6 Jul 2015 21:27:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.138 X-Spam-Level: * X-Spam-Status: No, score=1.138 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.663, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 4lQycCpSw5q0 for ; Mon, 6 Jul 2015 21:27:15 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 6746021231 for ; Mon, 6 Jul 2015 21:27:14 +0000 (UTC) Received: (qmail 34091 invoked by uid 99); 6 Jul 2015 21:27:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jul 2015 21:27:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F9D0E0504; Mon, 6 Jul 2015 21:27:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dschneider@apache.org To: commits@geode.incubator.apache.org Date: Mon, 06 Jul 2015 21:27:14 -0000 Message-Id: <368b8b3c13c240c28ef7348771eebe5a@git.apache.org> In-Reply-To: <1367993cf1c24bbea023c2a9c86dc3be@git.apache.org> References: <1367993cf1c24bbea023c2a9c86dc3be@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] incubator-geode git commit: GEODE-9: Imported gemfire-spark-connector from geode-1.0.0-SNAPSHOT-2.src.tar http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala new file mode 100644 index 0000000..08cd042 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala @@ -0,0 +1,22 @@ +package io.pivotal.gemfire.spark.connector.internal.oql + +import com.gemstone.gemfire.cache.query.internal.StructImpl +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ + +class RowBuilder[T](queryRDD: QueryRDD[T]) { + + /** + * Convert QueryRDD to RDD of Row + * @return RDD of Rows + */ + def toRowRDD(): RDD[Row] = { + val rowRDD = queryRDD.map(row => { + row match { + case si: StructImpl => Row.fromSeq(si.getFieldValues) + case obj: Object => Row.fromSeq(Seq(obj)) + } + }) + rowRDD + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala new file mode 100644 index 0000000..d878a2a --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala @@ -0,0 +1,57 @@ +package io.pivotal.gemfire.spark.connector.internal.oql + +import com.gemstone.gemfire.cache.query.internal.StructImpl +import org.apache.spark.sql.types._ +import scala.collection.mutable.ListBuffer +import org.apache.spark.Logging + +class SchemaBuilder[T](queryRDD: QueryRDD[T]) extends Logging { + + val nullStructType = StructType(Nil) + + val typeMap:Map[Class[_], DataType] = Map( + (classOf[java.lang.String], StringType), + (classOf[java.lang.Integer], IntegerType), + (classOf[java.lang.Short], ShortType), + (classOf[java.lang.Long], LongType), + (classOf[java.lang.Double], DoubleType), + (classOf[java.lang.Float], FloatType), + (classOf[java.lang.Boolean], BooleanType), + (classOf[java.lang.Byte], ByteType), + (classOf[java.util.Date], DateType), + (classOf[java.lang.Object], nullStructType) + ) + + /** + * Analyse QueryRDD to get the Spark schema + * @return The schema represented by Spark StructType + */ + def toSparkSchema(): StructType = { + val row = queryRDD.first() + val tpe = row match { + case r: StructImpl => constructFromStruct(r) + case null => StructType(StructField("col1", NullType) :: Nil) + case default => + val value = typeMap.getOrElse(default.getClass(), nullStructType) + StructType(StructField("col1", value) :: Nil) + } + logInfo(s"Schema: $tpe") + tpe + } + + def constructFromStruct(r:StructImpl) = { + val names = r.getFieldNames + val values = r.getFieldValues + val lb = new ListBuffer[StructField]() + for (i <- 0 until names.length) { + val name = names(i) + val value = values(i) + val dataType = value match { + case null => NullType + case default => typeMap.getOrElse(default.getClass, nullStructType) + } + lb += StructField(name, dataType) + } + StructType(lb.toSeq) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala new file mode 100644 index 0000000..497aadf --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala @@ -0,0 +1,30 @@ +package io.pivotal.gemfire.spark.connector.internal.oql + +import com.esotericsoftware.kryo.{Kryo, Serializer} +import com.esotericsoftware.kryo.io.{Output, Input} +import com.gemstone.gemfire.cache.query.QueryService +import com.gemstone.gemfire.cache.query.internal.Undefined + +/** + * This is the customized serializer to serialize QueryService.UNDEFINED, + * i.e. com.gemstone.gemfire.cache.query.internal.Undefined, in order to + * guarantee the singleton Undefined after its deserialization within Spark. + */ +class UndefinedSerializer extends Serializer[Undefined] { + + def write(kryo: Kryo, output: Output, u: Undefined) { + //Only serialize a byte for Undefined + output.writeByte(u.getDSFID) + } + + def read (kryo: Kryo, input: Input, tpe: Class[Undefined]): Undefined = { + //Read DSFID of Undefined + input.readByte() + QueryService.UNDEFINED match { + case null => new Undefined + case _ => + //Avoid calling Undefined constructor again. + QueryService.UNDEFINED.asInstanceOf[Undefined] + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala new file mode 100644 index 0000000..53b61b1 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala @@ -0,0 +1,51 @@ +package io.pivotal.gemfire.spark.connector.internal.rdd + +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf +import org.apache.spark.{TaskContext, Partition} +import org.apache.spark.rdd.RDD +import scala.collection.JavaConversions._ + +/** + * An `RDD[T, V]` that will represent the result of a join between `left` RDD[T] + * and the specified GemFire Region[K, V]. + */ +class GemFireJoinRDD[T, K, V] private[connector] + ( left: RDD[T], + func: T => K, + val regionPath: String, + val connConf: GemFireConnectionConf + ) extends RDD[(T, V)](left.context, left.dependencies) { + + /** validate region existence when GemFireRDD object is created */ + validate() + + /** Validate region, and make sure it exists. */ + private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) + + override protected def getPartitions: Array[Partition] = left.partitions + + override def compute(split: Partition, context: TaskContext): Iterator[(T, V)] = { + val region = connConf.getConnection.getRegionProxy[K, V](regionPath) + if (func == null) computeWithoutFunc(split, context, region) + else computeWithFunc(split, context, region) + } + + /** T is (K, V1) since there's no map function `func` */ + private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = { + val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]] + val leftKeys = leftPairs.map { case (k, v) => k}.toSet + // Note: get all will return (key, null) for non-exist entry, so remove those entries + val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null} + leftPairs.filter{case (k, v) => rightPairs.contains(k)} + .map {case (k, v) => ((k, v).asInstanceOf[T], rightPairs.get(k).get)}.toIterator + } + + private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = { + val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t))) + val leftKeys = leftPairs.map { case (t, k) => k}.toSet + // Note: get all will return (key, null) for non-exist entry, so remove those entries + val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null} + leftPairs.filter { case (t, k) => rightPairs.contains(k)}.map {case (t, k) => (t, rightPairs.get(k).get)}.toIterator + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala new file mode 100644 index 0000000..bed291c --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala @@ -0,0 +1,53 @@ +package io.pivotal.gemfire.spark.connector.internal.rdd + +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf +import org.apache.spark.{TaskContext, Partition} +import org.apache.spark.rdd.RDD +import scala.collection.JavaConversions._ + +/** + * An `RDD[ T, Option[V] ]` that represents the result of a left outer join + * between `left` RDD[T] and the specified GemFire Region[K, V]. + */ +class GemFireOuterJoinRDD[T, K, V] private[connector] + ( left: RDD[T], + func: T => K, + val regionPath: String, + val connConf: GemFireConnectionConf + ) extends RDD[(T, Option[V])](left.context, left.dependencies) { + + /** validate region existence when GemFireRDD object is created */ + validate() + + /** Validate region, and make sure it exists. */ + private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) + + override protected def getPartitions: Array[Partition] = left.partitions + + override def compute(split: Partition, context: TaskContext): Iterator[(T, Option[V])] = { + val region = connConf.getConnection.getRegionProxy[K, V](regionPath) + if (func == null) computeWithoutFunc(split, context, region) + else computeWithFunc(split, context, region) + } + + /** T is (K1, V1), and K1 and K are the same type since there's no map function `func` */ + private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = { + val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]] + val leftKeys = leftPairs.map { case (k, v) => k}.toSet + // Note: get all will return (key, null) for non-exist entry + val rightPairs = region.getAll(leftKeys) + // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option + leftPairs.map{ case (k, v) => ((k, v).asInstanceOf[T], Option(rightPairs.get(k))) }.toIterator + } + + private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = { + val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t))) + val leftKeys = leftPairs.map { case (t, k) => k}.toSet + // Note: get all will return (key, null) for non-exist entry + val rightPairs = region.getAll(leftKeys) + // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option + leftPairs.map{ case (t, k) => (t, Option(rightPairs.get(k)))}.toIterator + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala new file mode 100644 index 0000000..0895024 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala @@ -0,0 +1,20 @@ +package io.pivotal.gemfire.spark.connector.internal.rdd + +import org.apache.spark.Partition + +/** + * This serializable class represents a GemFireRDD partition. Each partition is mapped + * to one or more buckets of region. The GemFireRDD can materialize the data of the + * partition based on all information contained here. + * @param partitionId partition id, a 0 based number. + * @param bucketSet region bucket id set for this partition. Set.empty means whole + * region (used for replicated region) + * @param locations preferred location for this partition + */ +case class GemFireRDDPartition ( + partitionId: Int, bucketSet: Set[Int], locations: Seq[String] = Nil) + extends Partition { + + override def index: Int = partitionId + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala new file mode 100644 index 0000000..7ec8d42 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala @@ -0,0 +1,43 @@ +package io.pivotal.gemfire.spark.connector.internal.rdd + +import io.pivotal.gemfire.spark.connector.GemFireConnection +import io.pivotal.gemfire.spark.connector.internal.RegionMetadata +import org.apache.spark.{Logging, Partition} + +import scala.reflect.ClassTag + +/** + * A GemFireRDD partitioner is used to partition the region into multiple RDD partitions. + */ +trait GemFireRDDPartitioner extends Serializable { + + def name: String + + /** the function that generates partitions */ + def partitions[K: ClassTag, V: ClassTag] + (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] +} + +object GemFireRDDPartitioner extends Logging { + + /** To add new partitioner, just add it to the following list */ + final val partitioners: Map[String, GemFireRDDPartitioner] = + List(OnePartitionPartitioner, ServerSplitsPartitioner).map(e => (e.name, e)).toMap + + /** + * Get a partitioner based on given name, a default partitioner will be returned if there's + * no partitioner for the given name. + */ + def apply(name: String = defaultPartitionedRegionPartitioner.name): GemFireRDDPartitioner = { + val p = partitioners.get(name) + if (p.isDefined) p.get else { + logWarning(s"Invalid preferred partitioner name $name.") + defaultPartitionedRegionPartitioner + } + } + + val defaultReplicatedRegionPartitioner = OnePartitionPartitioner + + val defaultPartitionedRegionPartitioner = ServerSplitsPartitioner + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala new file mode 100644 index 0000000..0c9c34f --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala @@ -0,0 +1,73 @@ +package io.pivotal.gemfire.spark.connector.internal.rdd + +import io.pivotal.gemfire.spark.connector.GemFireConnection +import io.pivotal.gemfire.spark.connector.internal.RegionMetadata +import io.pivotal.gemfire.spark.connector.NumberPartitionsPerServerPropKey +import org.apache.spark.Partition +import scala.collection.JavaConversions._ +import scala.collection.immutable.SortedSet +import scala.collection.mutable +import scala.reflect.ClassTag + +/** This partitioner maps whole region to one GemFireRDDPartition */ +object OnePartitionPartitioner extends GemFireRDDPartitioner { + + override val name = "OnePartition" + + override def partitions[K: ClassTag, V: ClassTag] + (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = + Array[Partition](new GemFireRDDPartition(0, Set.empty)) +} + +/** + * This partitioner maps whole region to N * M GemFire RDD partitions, where M is the number of + * GemFire servers that contain the data for the given region. Th default value of N is 1. + */ +object ServerSplitsPartitioner extends GemFireRDDPartitioner { + + override val name = "ServerSplits" + + override def partitions[K: ClassTag, V: ClassTag] + (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = { + if (md == null) throw new RuntimeException("RegionMetadata is null") + val n = env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt + if (!md.isPartitioned || md.getServerBucketMap == null || md.getServerBucketMap.isEmpty) + Array[Partition](new GemFireRDDPartition(0, Set.empty)) + else { + val map = mapAsScalaMap(md.getServerBucketMap) + .map { case (srv, set) => (srv, asScalaSet(set).map(_.toInt)) }.toList + .map { case (srv, set) => (srv.getHostName, set) } + doPartitions(map, md.getTotalBuckets, n) + } + } + + /** Converts server to bucket ID set list to array of RDD partitions */ + def doPartitions(serverBucketMap: List[(String, mutable.Set[Int])], totalBuckets: Int, n: Int) + : Array[Partition] = { + + // method that calculates the group size for splitting "k" items into "g" groups + def groupSize(k: Int, g: Int): Int = scala.math.ceil(k / g.toDouble).toInt + + // 1. convert list of server and bucket set pairs to a list of server and sorted bucket set pairs + val srvToSortedBucketSet = serverBucketMap.map { case (srv, set) => (srv, SortedSet[Int]() ++ set) } + + // 2. split bucket set of each server into n splits if possible, and server to Seq(server) + val srvToSplitedBuckeSet = srvToSortedBucketSet.flatMap { case (host, set) => + if (set.isEmpty) Nil else set.grouped(groupSize(set.size, n)).toList.map(s => (Seq(host), s)) } + + // 3. calculate empty bucket IDs by removing all bucket sets of all servers from the full bucket sets + val emptyIDs = SortedSet[Int]() ++ ((0 until totalBuckets).toSet /: srvToSortedBucketSet) {case (s1, (k, s2)) => s1 &~ s2} + + // 4. distribute empty bucket IDs to all partitions evenly. + // The empty buckets do not contain data when partitions are created, but they may contain data + // when RDD is materialized, so need to include those bucket IDs in the partitions. + val srvToFinalBucketSet = if (emptyIDs.isEmpty) srvToSplitedBuckeSet + else srvToSplitedBuckeSet.zipAll( + emptyIDs.grouped(groupSize(emptyIDs.size, srvToSplitedBuckeSet.size)).toList, (Nil, Set.empty), Set.empty).map + { case ((server, set1), set2) => (server, SortedSet[Int]() ++ set1 ++ set2) } + + // 5. create array of partitions w/ 0-based index + (0 until srvToFinalBucketSet.size).toList.zip(srvToFinalBucketSet).map + { case (i, (srv, set)) => new GemFireRDDPartition(i, set, srv) }.toArray + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala new file mode 100644 index 0000000..573902b --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala @@ -0,0 +1,55 @@ +package io.pivotal.gemfire.spark.connector.internal.rdd + +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf +import org.apache.spark.{Logging, TaskContext} + +import scala.collection.Iterator +import collection.JavaConversions._ + +/** This trait provide some common code for pair and non-pair RDD writer */ +private[rdd] trait GemFireRDDWriterTraceUtils { + + def mapDump(map: Map[_, _], num: Int): String = { + val firstNum = map.take(num + 1) + if (firstNum.size > num) s"$firstNum ..." else s"$firstNum" + } +} + +/** + * Writer object that provides write function that saves non-pair RDD partitions to GemFire. + * Those functions will be executed on Spark executors. + * @param regionPath the full path of the region where the data is written to + */ +class GemFireRDDWriter[T, K, V] +(regionPath: String, connConf: GemFireConnectionConf) extends Serializable with GemFireRDDWriterTraceUtils with Logging { + + def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): Unit = { + val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath) + // todo. optimize batch size of putAll + val map: Map[K, V] = data.map(func).toMap + region.putAll(map) + logDebug(s"${map.size} entries are saved to region $regionPath") + logTrace(mapDump(map, 10)) + } +} + + +/** + * Writer object that provides write function that saves pair RDD partitions to GemFire. + * Those functions will be executed on Spark executors. + * @param regionPath the full path of the region where the data is written to + */ +class GemFirePairRDDWriter[K, V] +(regionPath: String, connConf: GemFireConnectionConf) extends Serializable with GemFireRDDWriterTraceUtils with Logging { + + def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = { + val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath) + // todo. optimize batch size of putAll + val map: Map[K, V] = data.toMap + region.putAll(map) + logDebug(s"${map.size} entries are saved to region $regionPath") + logTrace(mapDump(map, 10)) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala new file mode 100644 index 0000000..cff61d6 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala @@ -0,0 +1,122 @@ +package io.pivotal.gemfire.spark.connector.internal.rdd + +import scala.collection.Seq +import scala.reflect.ClassTag +import org.apache.spark.rdd.RDD +import org.apache.spark.{TaskContext, Partition, SparkContext} +import io.pivotal.gemfire.spark.connector.{GemFireConnectionConf, PreferredPartitionerPropKey} +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._ + +/** + * This class exposes GemFire region as a RDD. + * @param sc the Spark Context + * @param regionPath the full path of the region + * @param connConf the GemFireConnectionConf to access the region + * @param opConf the parameters for this operation, such as preferred partitioner. + */ +class GemFireRegionRDD[K, V] private[connector] + (@transient sc: SparkContext, + val regionPath: String, + val connConf: GemFireConnectionConf, + val opConf: Map[String, String] = Map.empty, + val whereClause: Option[String] = None + ) (implicit ctk: ClassTag[K], ctv: ClassTag[V]) + extends RDD[(K, V)](sc, Seq.empty) { + + /** validate region existence when GemFireRDD object is created */ + validate() + + /** Validate region, and make sure it exists. */ + private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) + + def kClassTag = ctk + + def vClassTag = ctv + + /** + * method `copy` is used by method `where` that creates new immutable + * GemFireRDD instance based this instance. + */ + private def copy( + regionPath: String = regionPath, + connConf: GemFireConnectionConf = connConf, + opConf: Map[String, String] = opConf, + whereClause: Option[String] = None + ): GemFireRegionRDD[K, V] = { + + require(sc != null, + """RDD transformation requires a non-null SparkContext. Unfortunately + |SparkContext in this GemFireRDD is null. This can happen after + |GemFireRDD has been deserialized. SparkContext is not Serializable, + |therefore it deserializes to null. RDD transformations are not allowed + |inside lambdas used in other RDD transformations.""".stripMargin ) + + new GemFireRegionRDD[K, V](sc, regionPath, connConf, opConf, whereClause) + } + + /** When where clause is specified, OQL query + * `select key, value from /.entries where ` + * is used to filter the dataset. + */ + def where(whereClause: Option[String]): GemFireRegionRDD[K, V] = { + if (whereClause.isDefined) copy(whereClause = whereClause) + else this + } + + /** this version is for Java API that doesn't use scala.Option */ + def where(whereClause: String): GemFireRegionRDD[K, V] = { + if (whereClause == null || whereClause.trim.isEmpty) this + else copy(whereClause = Option(whereClause.trim)) + } + + /** + * Use preferred partitioner generate partitions. `defaultReplicatedRegionPartitioner` + * will be used if it's a replicated region. + */ + override def getPartitions: Array[Partition] = { + val conn = connConf.getConnection + val md = conn.getRegionMetadata[K, V](regionPath) + md match { + case None => throw new RuntimeException(s"region $regionPath was not found.") + case Some(data) => + logInfo(s"""RDD id=${this.id} region=$regionPath conn=${connConf.locators.mkString(",")}, env=$opConf""") + val p = if (data.isPartitioned) preferredPartitioner else defaultReplicatedRegionPartitioner + val splits = p.partitions[K, V](conn, data, opConf) + logDebug(s"""RDD id=${this.id} region=$regionPath partitions=${splits.mkString(",")}""") + splits + } + } + + /** + * provide preferred location(s) (host name(s)) of the given partition. + * Only some partitioner implementation(s) provides this info, which is + * useful when Spark cluster and GemFire cluster share some hosts. + */ + override def getPreferredLocations(split: Partition) = + split.asInstanceOf[GemFireRDDPartition].locations + + /** + * Get preferred partitioner. return `defaultPartitionedRegionPartitioner` if none + * preference is specified. + */ + private def preferredPartitioner = + GemFireRDDPartitioner(opConf.getOrElse( + PreferredPartitionerPropKey, GemFireRDDPartitioner.defaultPartitionedRegionPartitioner.name)) + + /** materialize a RDD partition */ + override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { + val partition = split.asInstanceOf[GemFireRDDPartition] + logDebug(s"compute RDD id=${this.id} partition $partition") + connConf.getConnection.getRegionData[K,V](regionPath, whereClause, partition) + // new InterruptibleIterator(context, split.asInstanceOf[GemFireRDDPartition[K, V]].iterator) + } +} + +object GemFireRegionRDD { + + def apply[K: ClassTag, V: ClassTag](sc: SparkContext, regionPath: String, + connConf: GemFireConnectionConf, opConf: Map[String, String] = Map.empty) + : GemFireRegionRDD[K, V] = + new GemFireRegionRDD[K, V](sc, regionPath, connConf, opConf) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala new file mode 100644 index 0000000..6d8ed59 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala @@ -0,0 +1,10 @@ +package io.pivotal.gemfire.spark.connector.javaapi + +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD +import org.apache.spark.api.java.JavaPairRDD + +class GemFireJavaRegionRDD[K, V](rdd: GemFireRegionRDD[K, V]) extends JavaPairRDD[K, V](rdd)(rdd.kClassTag, rdd.vClassTag) { + + def where(whereClause: String): GemFireJavaRegionRDD[K, V] = new GemFireJavaRegionRDD(rdd.where(whereClause)) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala new file mode 100644 index 0000000..cf7b250 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala @@ -0,0 +1,35 @@ +package io.pivotal.gemfire.spark.connector.javaapi + +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} +import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream} + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +/** + * A helper class to make it possible to access components written in Scala from Java code. + */ +private[connector] object JavaAPIHelper { + + /** Returns a `ClassTag` of a given runtime class. */ + def getClassTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz) + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * see JavaSparkContext.fakeClassTag in Spark for more info. + */ + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + + /** Converts a Java `Properties` to a Scala immutable `Map[String, String]`. */ + def propertiesToScalaMap[K, V](props: java.util.Properties): Map[String, String] = + Map(props.toSeq: _*) + + /** convert a JavaRDD[(K,V)] to JavaPairRDD[K,V] */ + def toJavaPairRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = + JavaPairRDD.fromJavaRDD(rdd) + + /** convert a JavaDStream[(K,V)] to JavaPairDStream[K,V] */ + def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] = + JavaPairDStream.fromJavaDStream(ds) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala new file mode 100644 index 0000000..834d6a5 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala @@ -0,0 +1,43 @@ +package io.pivotal.gemfire.spark + +import io.pivotal.gemfire.spark.connector.internal.rdd.{ServerSplitsPartitioner, OnePartitionPartitioner} +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext + +import scala.reflect.ClassTag + +/** + * The root package of Gemfire connector for Apache Spark. + * Provides handy implicit conversions that add gemfire-specific + * methods to `SparkContext` and `RDD`. + */ +package object connector { + + /** constants */ + final val GemFireLocatorPropKey = "spark.gemfire.locators" + // partitioner related keys and values + final val PreferredPartitionerPropKey = "preferred.partitioner" + final val NumberPartitionsPerServerPropKey = "number.partitions.per.server" + final val OnePartitionPartitionerName = OnePartitionPartitioner.name + final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name + + implicit def toSparkContextFunctions(sc: SparkContext): GemFireSparkContextFunctions = + new GemFireSparkContextFunctions(sc) + + implicit def toSQLContextFunctions(sqlContext: SQLContext): GemFireSQLContextFunctions = + new GemFireSQLContextFunctions(sqlContext) + + implicit def toGemfirePairRDDFunctions[K: ClassTag, V: ClassTag] + (self: RDD[(K, V)]): GemFirePairRDDFunctions[K, V] = new GemFirePairRDDFunctions(self) + + implicit def toGemfireRDDFunctions[T: ClassTag] + (self: RDD[T]): GemFireRDDFunctions[T] = new GemFireRDDFunctions(self) + + /** utility implicits */ + + /** convert Map[String, String] to java.util.Properties */ + implicit def map2Properties(map: Map[String,String]): java.util.Properties = + (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props} + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala new file mode 100644 index 0000000..91eb784 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala @@ -0,0 +1,61 @@ +package io.pivotal.gemfire.spark.connector.streaming + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf +import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFirePairRDDWriter, GemFireRDDWriter} +import org.apache.spark.Logging +import org.apache.spark.api.java.function.PairFunction +import org.apache.spark.streaming.dstream.DStream + +/** + * Extra gemFire functions on DStream of non-pair elements through an implicit conversion. + * Import `io.pivotal.gemfire.spark.connector.streaming._` at the top of your program to + * use these functions. + */ +class GemFireDStreamFunctions[T](val dstream: DStream[T]) extends Serializable with Logging { + + /** + * Save the DStream of non-pair elements to GemFire key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the function that converts elements of the DStream to key/value pairs + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ + def saveToGemfire[K, V]( + regionPath: String, func: T => (K, V), connConf: GemFireConnectionConf = defaultConnectionConf): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf) + logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") + dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) _)) + } + + /** this version of saveToGemfire is just for Java API */ + def saveToGemfire[K, V](regionPath: String, func: PairFunction[T, K, V], connConf: GemFireConnectionConf): Unit = { + saveToGemfire[K, V](regionPath, func.call _, connConf) + } + + private[connector] def defaultConnectionConf: GemFireConnectionConf = + GemFireConnectionConf(dstream.context.sparkContext.getConf) +} + + +/** + * Extra gemFire functions on DStream of (key, value) pairs through an implicit conversion. + * Import `io.pivotal.gemfire.spark.connector.streaming._` at the top of your program to + * use these functions. + */ +class GemFirePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends Serializable with Logging { + + /** + * Save the DStream of pairs to GemFire key-value store without any conversion + * @param regionPath the full path of region that the DStream is stored + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ + def saveToGemfire(regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf) + logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") + dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _)) + } + + private[connector] def defaultConnectionConf: GemFireConnectionConf = + GemFireConnectionConf(dstream.context.sparkContext.getConf) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala new file mode 100644 index 0000000..c003561 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala @@ -0,0 +1,16 @@ +package io.pivotal.gemfire.spark.connector + +import org.apache.spark.streaming.dstream.DStream + +/** + * Provides handy implicit conversions that add gemfire-specific methods to `DStream`. + */ +package object streaming { + + implicit def toGemFireDStreamFunctions[T](ds: DStream[T]): GemFireDStreamFunctions[T] = + new GemFireDStreamFunctions[T](ds) + + implicit def toGemFirePairDStreamFunctions[K, V](ds: DStream[(K, V)]): GemFirePairDStreamFunctions[K, V] = + new GemFirePairDStreamFunctions[K, V](ds) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java b/gemfire-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java new file mode 100644 index 0000000..43415fa --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java @@ -0,0 +1,147 @@ +package io.pivotal.gemfire.spark.connector; + +import io.pivotal.gemfire.spark.connector.javaapi.*; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.SQLContext; +//import org.apache.spark.sql.api.java.JavaSQLContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.dstream.DStream; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import scala.Function1; +import scala.Function2; +import scala.Tuple2; +import scala.Tuple3; +import scala.collection.mutable.LinkedList; +import scala.reflect.ClassTag; + +import static org.junit.Assert.*; +import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + +public class JavaAPITest extends JUnitSuite { + + @SuppressWarnings( "unchecked" ) + public Tuple3 createCommonMocks() { + SparkContext mockSparkContext = mock(SparkContext.class); + GemFireConnectionConf mockConnConf = mock(GemFireConnectionConf.class); + GemFireConnection mockConnection = mock(GemFireConnection.class); + when(mockConnConf.getConnection()).thenReturn(mockConnection); + when(mockConnConf.locators()).thenReturn(new LinkedList()); + return new Tuple3<>(mockSparkContext, mockConnConf, mockConnection); + } + + @Test + public void testSparkContextFunction() throws Exception { + Tuple3 tuple3 = createCommonMocks(); + GemFireJavaSparkContextFunctions wrapper = javaFunctions(tuple3._1()); + assertTrue(tuple3._1() == wrapper.sc); + String regionPath = "testregion"; + JavaPairRDD rdd = wrapper.gemfireRegion(regionPath, tuple3._2()); + verify(tuple3._3()).validateRegion(regionPath); + } + + @Test + public void testJavaSparkContextFunctions() throws Exception { + SparkContext mockSparkContext = mock(SparkContext.class); + JavaSparkContext mockJavaSparkContext = mock(JavaSparkContext.class); + when(mockJavaSparkContext.sc()).thenReturn(mockSparkContext); + GemFireJavaSparkContextFunctions wrapper = javaFunctions(mockJavaSparkContext); + assertTrue(mockSparkContext == wrapper.sc); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaPairRDDFunctions() throws Exception { + JavaPairRDD mockPairRDD = mock(JavaPairRDD.class); + RDD> mockTuple2RDD = mock(RDD.class); + when(mockPairRDD.rdd()).thenReturn(mockTuple2RDD); + GemFireJavaPairRDDFunctions wrapper = javaFunctions(mockPairRDD); + assertTrue(mockTuple2RDD == wrapper.rddf.rdd()); + + Tuple3 tuple3 = createCommonMocks(); + when(mockTuple2RDD.sparkContext()).thenReturn(tuple3._1()); + String regionPath = "testregion"; + wrapper.saveToGemfire(regionPath, tuple3._2()); + verify(mockTuple2RDD, times(1)).sparkContext(); + verify(tuple3._1(), times(1)).runJob(eq(mockTuple2RDD), any(Function2.class), any(ClassTag.class)); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaRDDFunctions() throws Exception { + JavaRDD mockJavaRDD = mock(JavaRDD.class); + RDD mockRDD = mock(RDD.class); + when(mockJavaRDD.rdd()).thenReturn(mockRDD); + GemFireJavaRDDFunctions wrapper = javaFunctions(mockJavaRDD); + assertTrue(mockRDD == wrapper.rddf.rdd()); + + Tuple3 tuple3 = createCommonMocks(); + when(mockRDD.sparkContext()).thenReturn(tuple3._1()); + PairFunction mockPairFunc = mock(PairFunction.class); + String regionPath = "testregion"; + wrapper.saveToGemfire(regionPath, mockPairFunc, tuple3._2()); + verify(mockRDD, times(1)).sparkContext(); + verify(tuple3._1(), times(1)).runJob(eq(mockRDD), any(Function2.class), any(ClassTag.class)); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaPairDStreamFunctions() throws Exception { + JavaPairDStream mockJavaDStream = mock(JavaPairDStream.class); + DStream> mockDStream = mock(DStream.class); + when(mockJavaDStream.dstream()).thenReturn(mockDStream); + GemFireJavaPairDStreamFunctions wrapper = javaFunctions(mockJavaDStream); + assertTrue(mockDStream == wrapper.dsf.dstream()); + + Tuple3 tuple3 = createCommonMocks(); + String regionPath = "testregion"; + wrapper.saveToGemfire(regionPath, tuple3._2()); + verify(tuple3._2()).getConnection(); + verify(tuple3._3()).validateRegion(regionPath); + verify(mockDStream).foreachRDD(any(Function1.class)); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaPairDStreamFunctionsWithTuple2DStream() throws Exception { + JavaDStream> mockJavaDStream = mock(JavaDStream.class); + DStream> mockDStream = mock(DStream.class); + when(mockJavaDStream.dstream()).thenReturn(mockDStream); + GemFireJavaPairDStreamFunctions wrapper = javaFunctions(toJavaPairDStream(mockJavaDStream)); + assertTrue(mockDStream == wrapper.dsf.dstream()); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaDStreamFunctions() throws Exception { + JavaDStream mockJavaDStream = mock(JavaDStream.class); + DStream mockDStream = mock(DStream.class); + when(mockJavaDStream.dstream()).thenReturn(mockDStream); + GemFireJavaDStreamFunctions wrapper = javaFunctions(mockJavaDStream); + assertTrue(mockDStream == wrapper.dsf.dstream()); + + Tuple3 tuple3 = createCommonMocks(); + PairFunction mockPairFunc = mock(PairFunction.class); + String regionPath = "testregion"; + wrapper.saveToGemfire(regionPath, mockPairFunc, tuple3._2()); + verify(tuple3._2()).getConnection(); + verify(tuple3._3()).validateRegion(regionPath); + verify(mockDStream).foreachRDD(any(Function1.class)); + } + + @Test + public void testSQLContextFunction() throws Exception { + SQLContext mockSQLContext = mock(SQLContext.class); + GemFireJavaSQLContextFunctions wrapper = javaFunctions(mockSQLContext); + assertTrue(wrapper.scf.getClass() == GemFireSQLContextFunctions.class); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala new file mode 100644 index 0000000..bdd4fbd --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala @@ -0,0 +1,42 @@ +package io.pivotal.gemfire.spark.connector + +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSuite, Matchers} +import org.apache.commons.httpclient.HttpClient +import java.io.File + + +class GemFireFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar { + val mockHttpClient: HttpClient = mock[HttpClient] + + test("jmx url creation") { + val jmxHostAndPort = "localhost:7070" + val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed" + val gfd = new GemFireFunctionDeployer(mockHttpClient); + val urlString = gfd.constructURLString(jmxHostAndPort) + assert(urlString === expectedUrlString) + } + + test("missing jar file") { + val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" + val gfd = new GemFireFunctionDeployer(mockHttpClient); + intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)} + } + + test("deploy with missing jar") { + val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" + val gfd = new GemFireFunctionDeployer(mockHttpClient); + intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))} + intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))} + } + + test("successful mocked deploy") { + val gfd = new GemFireFunctionDeployer(mockHttpClient); + val jar = new File("README.md"); + assert(gfd.deploy("localhost:7070", jar).contains("Deployed")) + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala new file mode 100644 index 0000000..c644aa9 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala @@ -0,0 +1,66 @@ +package io.pivotal.gemfire.spark.connector.internal + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSuite, Matchers} + +class DefaultGemFireConnectionManagerTest extends FunSuite with Matchers with MockitoSugar { + + test("DefaultGemFireConnectionFactory get/closeConnection") { + // note: connConf 1-4 share the same set of locators + val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234))) + val connConf2 = new GemFireConnectionConf(Seq(("host2", 5678))) + val connConf3 = new GemFireConnectionConf(Seq(("host1", 1234), ("host2", 5678))) + val connConf4 = new GemFireConnectionConf(Seq(("host2", 5678), ("host1", 1234))) + val connConf5 = new GemFireConnectionConf(Seq(("host5", 3333))) + + val props: Map[String, String] = Map.empty + val mockConnFactory: DefaultGemFireConnectionFactory = mock[DefaultGemFireConnectionFactory] + val mockConn1 = mock[DefaultGemFireConnection] + val mockConn2 = mock[DefaultGemFireConnection] + when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1) + when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2) + + assert(DefaultGemFireConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1) + // note: following 3 lines do not trigger connFactory.newConnection(...) + assert(DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) + assert(DefaultGemFireConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1) + assert(DefaultGemFireConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1) + assert(DefaultGemFireConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2) + + // connFactory.newConnection(...) were invoked only twice + verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props) + verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props) + assert(DefaultGemFireConnectionManager.connections.size == 3) + + DefaultGemFireConnectionManager.closeConnection(connConf1) + assert(DefaultGemFireConnectionManager.connections.size == 1) + DefaultGemFireConnectionManager.closeConnection(connConf5) + assert(DefaultGemFireConnectionManager.connections.isEmpty) + } + + test("DefaultGemFireConnectionFactory newConnection(...) throws RuntimeException") { + val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234))) + val props: Map[String, String] = Map.empty + val mockConnFactory: DefaultGemFireConnectionFactory = mock[DefaultGemFireConnectionFactory] + when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException()) + intercept[RuntimeException] { DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) } + verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props) + } + + test("DefaultGemFireConnectionFactory close() w/ non-exist connection") { + val props: Map[String, String] = Map.empty + val mockConnFactory: DefaultGemFireConnectionFactory = mock[DefaultGemFireConnectionFactory] + val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234))) + val connConf2 = new GemFireConnectionConf(Seq(("host2", 5678))) + val mockConn1 = mock[DefaultGemFireConnection] + when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1) + assert(DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) + assert(DefaultGemFireConnectionManager.connections.size == 1) + // connection does not exists in the connection manager + DefaultGemFireConnectionManager.closeConnection(connConf2) + assert(DefaultGemFireConnectionManager.connections.size == 1) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala new file mode 100644 index 0000000..04294e7 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala @@ -0,0 +1,238 @@ +package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions + +import com.gemstone.gemfire.DataSerializer +import com.gemstone.gemfire.cache.execute.{ResultCollector, ResultSender} +import com.gemstone.gemfire.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl} +import com.gemstone.gemfire.cache.query.types.ObjectType +import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput, HeapDataOutputStream} +import com.gemstone.gemfire.internal.cache.{CachedDeserializable, CachedDeserializableFactory} +import org.scalatest.{BeforeAndAfter, FunSuite} +import scala.collection.JavaConversions._ +import scala.concurrent.{Await, ExecutionContext, Future} +import ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter { + + /** + * A test ResultSender that connects struct ResultSender and ResultCollector + * Note: this test ResultSender has to copy the data (byte array) since the + * StructStreamingResultSender will reuse the byte array. + */ + class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] { + + var finishedNum = 0 + + override def sendResult(result: Object): Unit = + collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) + + /** exception should be sent via lastResult() */ + override def sendException(throwable: Throwable): Unit = + throw new UnsupportedOperationException("sendException is not supported.") + + override def lastResult(result: Object): Unit = { + collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) + this.synchronized { + finishedNum += 1 + if (finishedNum == num) + collector.endResults() + } + } + } + + /** common variables */ + var collector: StructStreamingResultCollector = _ + var baseSender: LocalResultSender = _ + /** common types */ + val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType] + val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType)) + val OneColType = new StructTypeImpl(Array("value"), Array(objType)) + + before { + collector = new StructStreamingResultCollector + baseSender = new LocalResultSender(collector, 1) + } + + test("transfer simple data") { + verifySimpleTransfer(sendDataType = true) + } + + test("transfer simple data with no type info") { + verifySimpleTransfer(sendDataType = false) + } + + def verifySimpleTransfer(sendDataType: Boolean): Unit = { + val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator + val dataType = if (sendDataType) TwoColType else null + new StructStreamingResultSender(baseSender, dataType , iter).send() + // println("type: " + collector.getResultType.toString) + assert(TwoColType.equals(collector.getResultType)) + val iter2 = collector.getResult + (0 to 9).foreach { i => + assert(iter2.hasNext) + val o = iter2.next() + assert(o.size == 2) + assert(o(0).asInstanceOf[Int] == i) + assert(o(1).asInstanceOf[String] == i.toString * 5) + } + assert(! iter2.hasNext) + } + + + /** + * A test iterator that generate integer data + * @param start the 1st value + * @param n number of integers generated + * @param genExcp generate Exception if true. This is used to test exception handling. + */ + def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = { + new Iterator[Array[Object]] { + val max = if (genExcp) start + n else start + n - 1 + var index: Int = start - 1 + + override def hasNext: Boolean = if (index < max) true else false + + override def next(): Array[Object] = + if (index < (start + n - 1)) { + index += 1 + Array(index.asInstanceOf[Object]) + } else throw new RuntimeException("simulated error") + } + } + + test("transfer data with 0 row") { + new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send() + // println("type: " + collector.getResultType.toString) + assert(collector.getResultType == null) + val iter = collector.getResult + assert(! iter.hasNext) + } + + test("transfer data with 10K rows") { + new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send() + // println("type: " + collector.getResultType.toString) + assert(OneColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + (1 to 10000).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 1) + assert(o(0).asInstanceOf[Int] == i) + } + assert(! iter.hasNext) + } + + test("transfer data with 10K rows with 2 sender") { + baseSender = new LocalResultSender(collector, 2) + val total = 300 + val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} + val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()} + Await.result(sender1, 1.seconds) + Await.result(sender2, 1.seconds) + + // println("type: " + collector.getResultType.toString) + assert(OneColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + val set = scala.collection.mutable.Set[Int]() + (1 to total).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 1) + assert(! set.contains(o(0).asInstanceOf[Int])) + set.add(o(0).asInstanceOf[Int]) + } + assert(! iter.hasNext) + } + + test("transfer data with 10K rows with 2 sender with error") { + baseSender = new LocalResultSender(collector, 2) + val total = 1000 + val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} + val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()} + Await.result(sender1, 1 seconds) + Await.result(sender2, 1 seconds) + + // println("type: " + collector.getResultType.toString) + assert(OneColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + val set = scala.collection.mutable.Set[Int]() + intercept[RuntimeException] { + (1 to total).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 1) + assert(! set.contains(o(0).asInstanceOf[Int])) + set.add(o(0).asInstanceOf[Int]) + } + } + // println(s"rows received: ${set.size}") + } + + test("transfer data with Exception") { + new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send() + // println("type: " + collector.getResultType.toString) + val iter = collector.getResult + intercept[RuntimeException] ( iter.foreach(_.mkString(",")) ) + } + + def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] = + intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}")) + + test("transfer string pair data with 200 rows") { + new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send() + // println("type: " + collector.getResultType.toString) + assert(TwoColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + (1 to 1000).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 2) + assert(o(0) == s"key-$i") + assert(o(1) == s"value-$i") + } + assert(! iter.hasNext) + } + + /** + * Usage notes: There are 3 kinds of data to transfer: + * (1) object, (2) byte array of serialized object, and (3) byte array + * this test shows how to handle all of them. + */ + test("DataSerializer usage") { + val outBuf = new HeapDataOutputStream(1024, null) + val inBuf = new ByteArrayDataInput() + + // 1. a regular object + val hello = "Hello World!" * 30 + // serialize the data + DataSerializer.writeObject(hello, outBuf) + val bytesHello = outBuf.toByteArray.clone() + // de-serialize the data + inBuf.initialize(bytesHello, Version.CURRENT) + val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object] + assert(hello == hello2) + + // 2. byte array of serialized object + // serialize: byte array from `CachedDeserializable` + val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello) + outBuf.reset() + DataSerializer.writeByteArray(cd.getSerializedValue, outBuf) + // de-serialize the data in 2 steps + inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) + val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf) + inBuf.initialize(bytesHello2, Version.CURRENT) + val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object] + assert(hello == hello3) + + // 3. byte array + outBuf.reset() + DataSerializer.writeByteArray(bytesHello, outBuf) + inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) + val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf) + assert(bytesHello sameElements bytesHello3) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala new file mode 100644 index 0000000..86210b6 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala @@ -0,0 +1,67 @@ +package io.pivotal.gemfire.spark.connector.internal.oql + +import org.scalatest.FunSuite + +class QueryParserTest extends FunSuite { + + test("select * from /r1") { + val r = QueryParser.parseOQL("select * from /r1").get + assert(r == "List(/r1)") + } + + test("select c2 from /r1") { + val r = QueryParser.parseOQL("select c2 from /r1").get + assert(r == "List(/r1)") + } + + test("select key, value from /r1.entries") { + val r = QueryParser.parseOQL("select key, value from /r1.entries").get + assert(r == "List(/r1.entries)") + } + + test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") { + val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get + assert(r == "List(/r1)") + } + + test("select * from /r1/r2 where c1 >= 200") { + val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get + assert(r == "List(/r1/r2)") + } + + test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") { + val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get + assert(r == "List(/r1/r2, /r3/r4)") + } + + test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") { + val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get + assert(r == "List(/r1/r2)") + } + + test("IMPORT io.pivotal.gemfire IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") { + val r = QueryParser.parseOQL("IMPORT io.pivotal.gemfire IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get + assert(r == "List(/root/sub.entries)") + } + + test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") { + val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get + assert(r == "List(/region)") + } + + test("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID") { + val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID").get + assert(r == "List(/QueryRegion1, /QueryRegion2)") + } + + test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") { + val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get + println("r.type=" + r.getClass.getName + " r=" + r) + assert(r == "List(/obj_obj_region)") + } + + test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") { + val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get + assert(r == "List(/obj_obj_region, r.positions.values)") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala new file mode 100644 index 0000000..c50bbc7 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala @@ -0,0 +1,34 @@ +package unittest.io.pivotal.gemfire.spark.connector + +import io.pivotal.gemfire.spark.connector._ +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.scalatest.FunSuite +import org.scalatest.mock.MockitoSugar +import org.scalatest.Matchers + +class ConnectorImplicitsTest extends FunSuite with Matchers with MockitoSugar { + + test("implicit map2Properties") { + verifyProperties(Map.empty) + verifyProperties(Map("One" -> "1", "Two" -> "2", "Three" ->"3")) + } + + def verifyProperties(map: Map[String, String]): Unit = { + val props: java.util.Properties = map + assert(props.size() == map.size) + map.foreach(p => assert(props.getProperty(p._1) == p._2)) + } + + test("Test Implicit SparkContext Conversion") { + val mockSparkContext = mock[SparkContext] + val gfscf: GemFireSparkContextFunctions = mockSparkContext + assert(gfscf.isInstanceOf[GemFireSparkContextFunctions]) + } + + test("Test Implicit SQLContext Conversion") { + val mockSQLContext = mock[SQLContext] + val gfscf: GemFireSQLContextFunctions = mockSQLContext + assert(gfscf.isInstanceOf[GemFireSQLContextFunctions]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala new file mode 100644 index 0000000..2c64e19 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala @@ -0,0 +1,84 @@ +package unittest.io.pivotal.gemfire.spark.connector + +import org.apache.spark.SparkConf +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSuite} +import io.pivotal.gemfire.spark.connector._ + +class GemFireConnectionConfTest extends FunSuite with Matchers with MockitoSugar { + + test("apply(SparkConf) w/ GemFireLocator property and empty gemfireProps") { + val (host1, port1) = ("host1", 1234) + val (host2, port2) = ("host2", 5678) + val conf = new SparkConf().set(GemFireLocatorPropKey, s"$host1[$port1],$host2[$port2]") + val connConf = GemFireConnectionConf(conf) + assert(connConf.locators == Seq((host1, port1),(host2, port2))) + assert(connConf.gemfireProps.isEmpty) + } + + test("apply(SparkConf) w/ GemFireLocator property and gemfire properties") { + val (host1, port1) = ("host1", 1234) + val (host2, port2) = ("host2", 5678) + val (propK1, propV1) = ("ack-severe-alert-threshold", "1") + val (propK2, propV2) = ("ack-wait-threshold", "10") + val conf = new SparkConf().set(GemFireLocatorPropKey, s"$host1[$port1],$host2[$port2]") + .set(s"spark.gemfire.$propK1", propV1).set(s"spark.gemfire.$propK2", propV2) + val connConf = GemFireConnectionConf(conf) + assert(connConf.locators == Seq((host1, port1),(host2, port2))) + assert(connConf.gemfireProps == Map(propK1 -> propV1, propK2 -> propV2)) + } + + test("apply(SparkConf) w/o GemFireLocator property") { + intercept[RuntimeException] { GemFireConnectionConf(new SparkConf()) } + } + + test("apply(SparkConf) w/ invalid GemFireLocator property") { + val conf = new SparkConf().set(GemFireLocatorPropKey, "local^host:1234") + intercept[Exception] { GemFireConnectionConf(conf) } + } + + test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non gemfireProps") { + val (host1, port1) = ("host1", 1234) + val connConf = GemFireConnectionConf(s"$host1:$port1") + assert(connConf.locators == Seq((host1, port1))) + assert(connConf.gemfireProps.isEmpty) + } + + test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non-empty gemfireProps") { + val (host1, port1) = ("host1", 1234) + val (host2, port2) = ("host2", 5678) + val (propK1, propV1) = ("ack-severe-alert-threshold", "1") + val (propK2, propV2) = ("ack-wait-threshold", "10") + val props = Map(propK1 -> propV1, propK2 -> propV2) + val connConf = GemFireConnectionConf(s"$host1:$port1,$host2:$port2", props) + assert(connConf.locators == Seq((host1, port1),(host2, port2))) + assert(connConf.gemfireProps == props) + } + + test("apply(locatorStr, gemfireProps) w/ invalid locatorStr") { + intercept[Exception] { GemFireConnectionConf("local~host:4321") } + } + + test("constructor w/ empty (host,port) pairs") { + intercept[IllegalArgumentException] { new GemFireConnectionConf(Seq.empty) } + } + + test("getConnection() normal") { + implicit val mockFactory = mock[GemFireConnectionManager] + val mockConnection = mock[GemFireConnection] + when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenReturn(mockConnection) + val connConf = GemFireConnectionConf("localhost:1234") + assert(connConf.getConnection == mockConnection) + verify(mockFactory).getConnection(connConf) + } + + test("getConnection() failure") { + implicit val mockFactory = mock[GemFireConnectionManager] + when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenThrow(new RuntimeException) + val connConf = GemFireConnectionConf("localhost:1234") + intercept[RuntimeException] { connConf.getConnection } + verify(mockFactory).getConnection(connConf) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala new file mode 100644 index 0000000..ef4728c --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala @@ -0,0 +1,63 @@ +package unittest.io.pivotal.gemfire.spark.connector + +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector.{GemFireConnection, GemFireConnectionConf} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.DStream +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSuite} +import org.mockito.Matchers.{eq => mockEq, any => mockAny} + +import scala.reflect.ClassTag + +class GemFireDStreamFunctionsTest extends FunSuite with Matchers with MockitoSugar { + + test("test GemFirePairDStreamFunctions Implicit") { + import io.pivotal.gemfire.spark.connector.streaming._ + val mockDStream = mock[DStream[(Int, String)]] + // the implicit make the following line valid + val pairDStream: GemFirePairDStreamFunctions[Int, String] = mockDStream + pairDStream shouldBe a[GemFirePairDStreamFunctions[_, _]] + } + + test("test GemFireDStreamFunctions Implicit") { + import io.pivotal.gemfire.spark.connector.streaming._ + val mockDStream = mock[DStream[String]] + // the implicit make the following line valid + val dstream: GemFireDStreamFunctions[String] = mockDStream + dstream shouldBe a[GemFireDStreamFunctions[_]] + } + + def createMocks[K, V](regionPath: String) + (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]) + : (String, GemFireConnectionConf, GemFireConnection, Region[K, V]) = { + val mockConnection = mock[GemFireConnection] + val mockConnConf = mock[GemFireConnectionConf] + val mockRegion = mock[Region[K, V]] + when(mockConnConf.getConnection).thenReturn(mockConnection) + when(mockConnConf.locators).thenReturn(Seq.empty) + (regionPath, mockConnConf, mockConnection, mockRegion) + } + + test("test GemFirePairDStreamFunctions.saveToGemfire()") { + import io.pivotal.gemfire.spark.connector.streaming._ + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") + val mockDStream = mock[DStream[(String, String)]] + mockDStream.saveToGemfire(regionPath, mockConnConf) + verify(mockConnConf).getConnection + verify(mockConnection).validateRegion[String, String](regionPath) + verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit]) + } + + test("test GemFireDStreamFunctions.saveToGemfire()") { + import io.pivotal.gemfire.spark.connector.streaming._ + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, Int]("test") + val mockDStream = mock[DStream[String]] + mockDStream.saveToGemfire[String, Int](regionPath, (s: String) => (s, s.length), mockConnConf) + verify(mockConnConf).getConnection + verify(mockConnection).validateRegion[String, String](regionPath) + verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit]) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala new file mode 100644 index 0000000..659fca2 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala @@ -0,0 +1,99 @@ +package unittest.io.pivotal.gemfire.spark.connector + +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector._ +import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDWriter, GemFirePairRDDWriter} +import org.apache.spark.{TaskContext, SparkContext} +import org.apache.spark.rdd.RDD +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSuite, Matchers} +import collection.JavaConversions._ +import scala.reflect.ClassTag +import org.mockito.Matchers.{eq => mockEq, any => mockAny} + +class GemFireRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar { + + test("test PairRDDFunction Implicit") { + import io.pivotal.gemfire.spark.connector._ + val mockRDD = mock[RDD[(Int, String)]] + // the implicit make the following line valid + val pairRDD: GemFirePairRDDFunctions[Int, String] = mockRDD + pairRDD shouldBe a [GemFirePairRDDFunctions[_, _]] + } + + test("test RDDFunction Implicit") { + import io.pivotal.gemfire.spark.connector._ + val mockRDD = mock[RDD[String]] + // the implicit make the following line valid + val nonPairRDD: GemFireRDDFunctions[String] = mockRDD + nonPairRDD shouldBe a [GemFireRDDFunctions[_]] + } + + def createMocks[K, V](regionPath: String) + (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]): (String, GemFireConnectionConf, GemFireConnection, Region[K, V]) = { + val mockConnection = mock[GemFireConnection] + val mockConnConf = mock[GemFireConnectionConf] + val mockRegion = mock[Region[K, V]] + when(mockConnConf.getConnection).thenReturn(mockConnection) + when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion) + // mockRegion shouldEqual mockConn.getRegionProxy[K, V](regionPath) + (regionPath, mockConnConf, mockConnection, mockRegion) + } + + test("test GemFirePairRDDWriter") { + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") + val writer = new GemFirePairRDDWriter[String, String](regionPath, mockConnConf) + val data = List(("1", "one"), ("2", "two"), ("3", "three")) + writer.write(null, data.toIterator) + val expectedMap: Map[String, String] = data.toMap + verify(mockRegion).putAll(expectedMap) + } + + test("test GemFireNonPairRDDWriter") { + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test") + val writer = new GemFireRDDWriter[String, Int, String](regionPath, mockConnConf) + val data = List("a", "ab", "abc") + val f: String => (Int, String) = s => (s.length, s) + writer.write(f)(null, data.toIterator) + val expectedMap: Map[Int, String] = data.map(f).toMap + verify(mockRegion).putAll(expectedMap) + } + + test("test PairRDDFunctions.saveToGemfire") { + import io.pivotal.gemfire.spark.connector._ + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") + val mockRDD = mock[RDD[(String, String)]] + val mockSparkContext = mock[SparkContext] + when(mockRDD.sparkContext).thenReturn(mockSparkContext) + val result = mockRDD.saveToGemfire(regionPath, mockConnConf) + verify(mockConnection, times(1)).validateRegion[String, String](regionPath) + result === Unit + verify(mockSparkContext, times(1)).runJob[(String, String), Unit]( + mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => Unit])(mockAny(classOf[ClassTag[Unit]])) + + // Note: current implementation make following code not compilable + // so not negative test for this case + // val rdd: RDD[(K, V)] = ... + // rdd.saveToGemfire(regionPath, s => (s.length, s)) + } + + test("test RDDFunctions.saveToGemfire") { + import io.pivotal.gemfire.spark.connector._ + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test") + val mockRDD = mock[RDD[(String)]] + val mockSparkContext = mock[SparkContext] + when(mockRDD.sparkContext).thenReturn(mockSparkContext) + val result = mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf) + verify(mockConnection, times(1)).validateRegion[Int, String](regionPath) + result === Unit + verify(mockSparkContext, times(1)).runJob[String, Unit]( + mockEq(mockRDD), mockAny[(TaskContext, Iterator[String]) => Unit])(mockAny(classOf[ClassTag[Unit]])) + + // Note: current implementation make following code not compilable + // so not negative test for this case + // val rdd: RDD[T] = ... // T is not a (K, V) tuple + // rdd.saveToGemfire(regionPath) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala new file mode 100644 index 0000000..508666a --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala @@ -0,0 +1,75 @@ +package unittest.io.pivotal.gemfire.spark.connector + +import io.pivotal.gemfire.spark.connector.internal.LocatorHelper +import org.scalatest.FunSuite + +class LocatorHelperTest extends FunSuite { + + test("locatorStr2HostPortPair hostname w/o domain") { + val (host, port) = ("localhost", 10334) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port)) + } + + test("locatorStr2HostPortPair hostname w/ domain") { + val (host, port) = ("localhost", 10334) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port)) + } + + test("locatorStr2HostPortPair w/ invalid host name") { + // empty or null locatorStr + assert(LocatorHelper.locatorStr2HostPortPair("").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair(null).isFailure) + // host name has leading `.` + assert(LocatorHelper.locatorStr2HostPortPair(".localhost.1234").isFailure) + // host name has leading and/or tail white space + assert(LocatorHelper.locatorStr2HostPortPair(" localhost.1234").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair("localhost .1234").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair(" localhost .1234").isFailure) + // host name contain invalid characters + assert(LocatorHelper.locatorStr2HostPortPair("local%host.1234").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair("localhost*.1234").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair("^localhost.1234").isFailure) + } + + test("locatorStr2HostPortPair w/ valid port") { + val host = "192.168.0.1" + // port has 2, 3, 4, 5 digits + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:20").get ==(host, 20)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:300").get ==(host, 300)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:4000").get ==(host, 4000)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:50000").get ==(host, 50000)) + } + + test("locatorStr2HostPortPair w/ invalid port") { + // port number is less than 2 digits + assert(LocatorHelper.locatorStr2HostPortPair("locslhost.9").isFailure) + // port number is more than 5 digits + assert(LocatorHelper.locatorStr2HostPortPair("locslhost.100000").isFailure) + // port number is invalid + assert(LocatorHelper.locatorStr2HostPortPair("locslhost.1xx1").isFailure) + } + + test("parseLocatorsString with valid locator(s)") { + val (host1, port1) = ("localhost", 10334) + assert(LocatorHelper.parseLocatorsString(s"$host1:$port1") == Seq((host1, port1))) + val (host2, port2) = ("localhost2", 10335) + assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2") == Seq((host1, port1),(host2, port2))) + val (host3, port3) = ("localhost2", 10336) + assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2,$host3:$port3") == + Seq((host1, port1),(host2, port2),(host3, port3))) + } + + test("parseLocatorsString with invalid locator(s)") { + // empty and null locatorsStr + intercept[Exception] { LocatorHelper.parseLocatorsString("") } + intercept[Exception] { LocatorHelper.parseLocatorsString(null) } + // 1 bad locatorStr + intercept[Exception] { LocatorHelper.parseLocatorsString("local%host.1234") } + // 1 good locatorStr and 1 bad locatorStr + intercept[Exception] { LocatorHelper.parseLocatorsString("localhost:2345,local%host.1234") } + intercept[Exception] { LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") } + } + +}