Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 705D2200C39 for ; Thu, 16 Mar 2017 15:52:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6F00B160B7A; Thu, 16 Mar 2017 14:52:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5329B160B78 for ; Thu, 16 Mar 2017 15:52:31 +0100 (CET) Received: (qmail 68477 invoked by uid 500); 16 Mar 2017 14:52:30 -0000 Mailing-List: contact commits-help@predictionio.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@predictionio.incubator.apache.org Delivered-To: mailing list commits@predictionio.incubator.apache.org Received: (qmail 68468 invoked by uid 99); 16 Mar 2017 14:52:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Mar 2017 14:52:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 034B118819F for ; Thu, 16 Mar 2017 14:52:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.569 X-Spam-Level: X-Spam-Status: No, score=-3.569 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id TN0c-P57KflR for ; Thu, 16 Mar 2017 14:52:18 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id A05B05FB79 for ; Thu, 16 Mar 2017 14:52:16 +0000 (UTC) Received: (qmail 66036 invoked by uid 99); 16 Mar 2017 14:52:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Mar 2017 14:52:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EA1B3DF986; Thu, 16 Mar 2017 14:52:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: donald@apache.org To: commits@predictionio.incubator.apache.org Date: Thu, 16 Mar 2017 14:52:15 -0000 Message-Id: <8a8d44148e2242259366cbbcd120da86@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/7] incubator-predictionio git commit: Move storage depended tests to each storage project archived-at: Thu, 16 Mar 2017 14:52:33 -0000 Move storage depended tests to each storage project Closes #362 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/647b480b Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/647b480b Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/647b480b Branch: refs/heads/feature/xbuild Commit: 647b480b0f0f693ec1b4d04dc9a2cf1492f0a0fe Parents: 606be41 Author: Naoki Takezoe Authored: Wed Mar 15 09:42:30 2017 -0700 Committer: Donald Szeto Committed: Wed Mar 15 09:42:30 2017 -0700 ---------------------------------------------------------------------- .../predictionio/data/storage/LEventsSpec.scala | 248 ----------------- .../predictionio/data/storage/PEventsSpec.scala | 213 --------------- .../data/storage/StorageTestUtils.scala | 45 ---- .../data/storage/hbase/LEventsSpec.scala | 239 +++++++++++++++++ .../data/storage/hbase/PEventsSpec.scala | 192 +++++++++++++ .../data/storage/hbase/StorageTestUtils.scala | 40 +++ .../data/storage/hbase/TestEvents.scala | 266 +++++++++++++++++++ .../data/storage/jdbc/LEventsSpec.scala | 236 ++++++++++++++++ .../data/storage/jdbc/PEventsSpec.scala | 193 ++++++++++++++ .../data/storage/jdbc/StorageTestUtils.scala | 29 ++ .../data/storage/jdbc/TestEvents.scala | 266 +++++++++++++++++++ 11 files changed, 1461 insertions(+), 506 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala deleted file mode 100644 index 3938072..0000000 --- a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage - -import org.specs2._ -import org.specs2.specification.Step - -class LEventsSpec extends Specification with TestEvents { - def is = s2""" - - PredictionIO Storage LEvents Specification - - Events can be implemented by: - - HBLEvents ${hbEvents} - - JDBCLEvents ${jdbcLEvents} - - """ - - def hbEvents = sequential ^ s2""" - - HBLEvents should - - behave like any LEvents implementation ${events(hbDO)} - - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))} - - """ - - def jdbcLEvents = sequential ^ s2""" - - JDBCLEvents should - - behave like any LEvents implementation ${events(jdbcDO)} - - """ - - val appId = 1 - - def events(eventClient: LEvents) = sequential ^ s2""" - - init default ${initDefault(eventClient)} - insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)} - insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)} - insert and delete by ID ${insertAndDelete(eventClient)} - insert test user events ${insertTestUserEvents(eventClient)} - find user events ${findUserEvents(eventClient)} - aggregate user properties ${aggregateUserProperties(eventClient)} - aggregate one user properties ${aggregateOneUserProperties(eventClient)} - aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)} - init channel ${initChannel(eventClient)} - insert 2 events to channel ${insertChannel(eventClient)} - insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)} - find events from channel ${findChannel(eventClient)} - remove default ${removeDefault(eventClient)} - remove channel ${removeChannel(eventClient)} - - """ - - val dbName = "test_pio_storage_events_" + hashCode - def hbDO = Storage.getDataObject[LEvents]( - StorageTestUtils.hbaseSourceName, - dbName - ) - - def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName) - - def initDefault(eventClient: LEvents) = { - eventClient.init(appId) - } - - def insertAndGetEvents(eventClient: LEvents) = { - - // events from TestEvents trait - val listOfEvents = List(r1,r2,r3) - - val insertResp = listOfEvents.map { eventClient.insert(_, appId) } - - val insertedEventId: List[String] = insertResp - - val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) - .map { case (e, id) => Some(e.copy(eventId = Some(id))) } - - val getResp = insertedEventId.map { id => eventClient.get(id, appId) } - - val getEvents = getResp - - insertedEvent must containTheSameElementsAs(getEvents) - } - - def insertAndGetTimezone(eventClient: LEvents) = { - val listOfEvents = List(tz1, tz2, tz3) - - val insertResp = listOfEvents.map { eventClient.insert(_, appId) } - - val insertedEventId: List[String] = insertResp - - val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) - .map { case (e, id) => Some(e.copy(eventId = Some(id))) } - - val getResp = insertedEventId.map { id => eventClient.get(id, appId) } - - val getEvents = getResp - - insertedEvent must containTheSameElementsAs(getEvents) - } - - def insertAndDelete(eventClient: LEvents) = { - val eventId = eventClient.insert(r2, appId) - - val resultBefore = eventClient.get(eventId, appId) - - val expectedBefore = r2.copy(eventId = Some(eventId)) - - val deleteStatus = eventClient.delete(eventId, appId) - - val resultAfter = eventClient.get(eventId, appId) - - (resultBefore must beEqualTo(Some(expectedBefore))) and - (deleteStatus must beEqualTo(true)) and - (resultAfter must beEqualTo(None)) - } - - def insertTestUserEvents(eventClient: LEvents) = { - // events from TestEvents trait - val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) - - listOfEvents.map{ eventClient.insert(_, appId) } - - success - } - - def findUserEvents(eventClient: LEvents) = { - - val results: List[Event] = eventClient.find( - appId = appId, - entityType = Some("user")) - .toList - .map(e => e.copy(eventId = None)) // ignore eventID - - // same events in insertTestUserEvents - val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) - - results must containTheSameElementsAs(expected) - } - - def aggregateUserProperties(eventClient: LEvents) = { - - val result: Map[String, PropertyMap] = eventClient.aggregateProperties( - appId = appId, - entityType = "user") - - val expected = Map( - "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), - "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) - ) - - result must beEqualTo(expected) - } - - def aggregateOneUserProperties(eventClient: LEvents) = { - val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( - appId = appId, - entityType = "user", - entityId = "u1") - - val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime)) - - result must beEqualTo(expected) - } - - def aggregateNonExistentUserProperties(eventClient: LEvents) = { - val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( - appId = appId, - entityType = "user", - entityId = "u999999") - - result must beEqualTo(None) - } - - val channelId = 12 - - def initChannel(eventClient: LEvents) = { - eventClient.init(appId, Some(channelId)) - } - - def insertChannel(eventClient: LEvents) = { - - // events from TestEvents trait - val listOfEvents = List(r4,r5) - - listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) ) - - success - } - - def insertAndDeleteChannel(eventClient: LEvents) = { - - val eventId = eventClient.insert(r2, appId, Some(channelId)) - - val resultBefore = eventClient.get(eventId, appId, Some(channelId)) - - val expectedBefore = r2.copy(eventId = Some(eventId)) - - val deleteStatus = eventClient.delete(eventId, appId, Some(channelId)) - - val resultAfter = eventClient.get(eventId, appId, Some(channelId)) - - (resultBefore must beEqualTo(Some(expectedBefore))) and - (deleteStatus must beEqualTo(true)) and - (resultAfter must beEqualTo(None)) - } - - def findChannel(eventClient: LEvents) = { - - val results: List[Event] = eventClient.find( - appId = appId, - channelId = Some(channelId) - ) - .toList - .map(e => e.copy(eventId = None)) // ignore eventId - - // same events in insertChannel - val expected = List(r4, r5) - - results must containTheSameElementsAs(expected) - } - - def removeDefault(eventClient: LEvents) = { - eventClient.remove(appId) - } - - def removeChannel(eventClient: LEvents) = { - eventClient.remove(appId, Some(channelId)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala deleted file mode 100644 index ccd71a4..0000000 --- a/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage - -import org.specs2._ -import org.specs2.specification.Step - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class PEventsSpec extends Specification with TestEvents { - - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") - val sc = new SparkContext("local[4]", "PEventAggregatorSpec test") - - val appId = 1 - val channelId = 6 - val dbName = "test_pio_storage_events_" + hashCode - - def hbLocal = Storage.getDataObject[LEvents]( - StorageTestUtils.hbaseSourceName, - dbName - ) - - def hbPar = Storage.getDataObject[PEvents]( - StorageTestUtils.hbaseSourceName, - dbName - ) - - def jdbcLocal = Storage.getDataObject[LEvents]( - StorageTestUtils.jdbcSourceName, - dbName - ) - - def jdbcPar = Storage.getDataObject[PEvents]( - StorageTestUtils.jdbcSourceName, - dbName - ) - - def stopSpark = { - sc.stop() - } - - def is = s2""" - - PredictionIO Storage PEvents Specification - - PEvents can be implemented by: - - HBPEvents ${hbPEvents} - - JDBCPEvents ${jdbcPEvents} - - (stop Spark) ${Step(sc.stop())} - - """ - - def hbPEvents = sequential ^ s2""" - - HBPEvents should - - behave like any PEvents implementation ${events(hbLocal, hbPar)} - - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))} - - """ - - def jdbcPEvents = sequential ^ s2""" - - JDBCPEvents should - - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)} - - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))} - - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))} - - """ - - def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2""" - - - (init test) ${initTest(localEventClient)} - - (insert test events) ${insertTestEvents(localEventClient)} - find in default ${find(parEventClient)} - find in channel ${findChannel(parEventClient)} - aggregate user properties in default ${aggregateUserProperties(parEventClient)} - aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)} - write to default ${write(parEventClient)} - write to channel ${writeChannel(parEventClient)} - - """ - - /* setup */ - - // events from TestEvents trait - val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2) - val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4) - - def initTest(localEventClient: LEvents) = { - localEventClient.init(appId) - localEventClient.init(appId, Some(channelId)) - } - - def insertTestEvents(localEventClient: LEvents) = { - listOfEvents.map( localEventClient.insert(_, appId) ) - // insert to channel - listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) ) - success - } - - /* following are tests */ - - def find(parEventClient: PEvents) = { - val resultRDD: RDD[Event] = parEventClient.find( - appId = appId - )(sc) - - val results = resultRDD.collect.toList - .map {_.copy(eventId = None)} // ignore eventId - - results must containTheSameElementsAs(listOfEvents) - } - - def findChannel(parEventClient: PEvents) = { - val resultRDD: RDD[Event] = parEventClient.find( - appId = appId, - channelId = Some(channelId) - )(sc) - - val results = resultRDD.collect.toList - .map {_.copy(eventId = None)} // ignore eventId - - results must containTheSameElementsAs(listOfEventsChannel) - } - - def aggregateUserProperties(parEventClient: PEvents) = { - val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( - appId = appId, - entityType = "user" - )(sc) - val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap - - val expected = Map( - "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), - "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) - ) - - result must beEqualTo(expected) - } - - def aggregateUserPropertiesChannel(parEventClient: PEvents) = { - val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( - appId = appId, - channelId = Some(channelId), - entityType = "user" - )(sc) - val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap - - val expected = Map( - "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime) - ) - - result must beEqualTo(expected) - } - - def write(parEventClient: PEvents) = { - val written = List(r5, r6) - val writtenRDD = sc.parallelize(written) - parEventClient.write(writtenRDD, appId)(sc) - - // read back - val resultRDD = parEventClient.find( - appId = appId - )(sc) - - val results = resultRDD.collect.toList - .map { _.copy(eventId = None)} // ignore eventId - - val expected = listOfEvents ++ written - - results must containTheSameElementsAs(expected) - } - - def writeChannel(parEventClient: PEvents) = { - val written = List(r1, r5, r6) - val writtenRDD = sc.parallelize(written) - parEventClient.write(writtenRDD, appId, Some(channelId))(sc) - - // read back - val resultRDD = parEventClient.find( - appId = appId, - channelId = Some(channelId) - )(sc) - - val results = resultRDD.collect.toList - .map { _.copy(eventId = None)} // ignore eventId - - val expected = listOfEventsChannel ++ written - - results must containTheSameElementsAs(expected) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala b/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala deleted file mode 100644 index 747076a..0000000 --- a/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage - -import org.apache.predictionio.data.storage.hbase.HBLEvents -import scalikejdbc._ - -object StorageTestUtils { - val hbaseSourceName = "HBASE" - val jdbcSourceName = "PGSQL" - - def dropHBaseNamespace(namespace: String): Unit = { - val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace) - .asInstanceOf[HBLEvents] - val admin = eventDb.client.admin - val tableNames = admin.listTableNamesByNamespace(namespace) - tableNames.foreach { name => - admin.disableTable(name) - admin.deleteTable(name) - } - - //Only empty namespaces (no tables) can be removed. - admin.deleteNamespace(namespace) - } - - def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s => - SQL(s"drop table $table").execute().apply() - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala new file mode 100644 index 0000000..c813ced --- /dev/null +++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase + +import org.apache.predictionio.data.storage.{Event, LEvents, PropertyMap, Storage} +import org.specs2._ +import org.specs2.specification.Step + +class LEventsSpec extends Specification with TestEvents { + def is = s2""" + + PredictionIO Storage LEvents Specification + + Events can be implemented by: + - HBLEvents ${hbEvents} + + """ + + def hbEvents = sequential ^ s2""" + + HBLEvents should + - behave like any LEvents implementation ${events(hbDO)} + - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))} + + """ + + val appId = 1 + + def events(eventClient: LEvents) = sequential ^ s2""" + + init default ${initDefault(eventClient)} + insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)} + insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)} + insert and delete by ID ${insertAndDelete(eventClient)} + insert test user events ${insertTestUserEvents(eventClient)} + find user events ${findUserEvents(eventClient)} + aggregate user properties ${aggregateUserProperties(eventClient)} + aggregate one user properties ${aggregateOneUserProperties(eventClient)} + aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)} + init channel ${initChannel(eventClient)} + insert 2 events to channel ${insertChannel(eventClient)} + insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)} + find events from channel ${findChannel(eventClient)} + remove default ${removeDefault(eventClient)} + remove channel ${removeChannel(eventClient)} + + """ + + val dbName = "test_pio_storage_events_" + hashCode + def hbDO = Storage.getDataObject[LEvents]( + StorageTestUtils.hbaseSourceName, + dbName + ) + + def initDefault(eventClient: LEvents) = { + eventClient.init(appId) + } + + def insertAndGetEvents(eventClient: LEvents) = { + + // events from TestEvents trait + val listOfEvents = List(r1,r2,r3) + + val insertResp = listOfEvents.map { eventClient.insert(_, appId) } + + val insertedEventId: List[String] = insertResp + + val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) + .map { case (e, id) => Some(e.copy(eventId = Some(id))) } + + val getResp = insertedEventId.map { id => eventClient.get(id, appId) } + + val getEvents = getResp + + insertedEvent must containTheSameElementsAs(getEvents) + } + + def insertAndGetTimezone(eventClient: LEvents) = { + val listOfEvents = List(tz1, tz2, tz3) + + val insertResp = listOfEvents.map { eventClient.insert(_, appId) } + + val insertedEventId: List[String] = insertResp + + val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) + .map { case (e, id) => Some(e.copy(eventId = Some(id))) } + + val getResp = insertedEventId.map { id => eventClient.get(id, appId) } + + val getEvents = getResp + + insertedEvent must containTheSameElementsAs(getEvents) + } + + def insertAndDelete(eventClient: LEvents) = { + val eventId = eventClient.insert(r2, appId) + + val resultBefore = eventClient.get(eventId, appId) + + val expectedBefore = r2.copy(eventId = Some(eventId)) + + val deleteStatus = eventClient.delete(eventId, appId) + + val resultAfter = eventClient.get(eventId, appId) + + (resultBefore must beEqualTo(Some(expectedBefore))) and + (deleteStatus must beEqualTo(true)) and + (resultAfter must beEqualTo(None)) + } + + def insertTestUserEvents(eventClient: LEvents) = { + // events from TestEvents trait + val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + + listOfEvents.map{ eventClient.insert(_, appId) } + + success + } + + def findUserEvents(eventClient: LEvents) = { + + val results: List[Event] = eventClient.find( + appId = appId, + entityType = Some("user")) + .toList + .map(e => e.copy(eventId = None)) // ignore eventID + + // same events in insertTestUserEvents + val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + + results must containTheSameElementsAs(expected) + } + + def aggregateUserProperties(eventClient: LEvents) = { + + val result: Map[String, PropertyMap] = eventClient.aggregateProperties( + appId = appId, + entityType = "user") + + val expected = Map( + "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + result must beEqualTo(expected) + } + + def aggregateOneUserProperties(eventClient: LEvents) = { + val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( + appId = appId, + entityType = "user", + entityId = "u1") + + val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime)) + + result must beEqualTo(expected) + } + + def aggregateNonExistentUserProperties(eventClient: LEvents) = { + val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( + appId = appId, + entityType = "user", + entityId = "u999999") + + result must beEqualTo(None) + } + + val channelId = 12 + + def initChannel(eventClient: LEvents) = { + eventClient.init(appId, Some(channelId)) + } + + def insertChannel(eventClient: LEvents) = { + + // events from TestEvents trait + val listOfEvents = List(r4,r5) + + listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) ) + + success + } + + def insertAndDeleteChannel(eventClient: LEvents) = { + + val eventId = eventClient.insert(r2, appId, Some(channelId)) + + val resultBefore = eventClient.get(eventId, appId, Some(channelId)) + + val expectedBefore = r2.copy(eventId = Some(eventId)) + + val deleteStatus = eventClient.delete(eventId, appId, Some(channelId)) + + val resultAfter = eventClient.get(eventId, appId, Some(channelId)) + + (resultBefore must beEqualTo(Some(expectedBefore))) and + (deleteStatus must beEqualTo(true)) and + (resultAfter must beEqualTo(None)) + } + + def findChannel(eventClient: LEvents) = { + + val results: List[Event] = eventClient.find( + appId = appId, + channelId = Some(channelId) + ) + .toList + .map(e => e.copy(eventId = None)) // ignore eventId + + // same events in insertChannel + val expected = List(r4, r5) + + results must containTheSameElementsAs(expected) + } + + def removeDefault(eventClient: LEvents) = { + eventClient.remove(appId) + } + + def removeChannel(eventClient: LEvents) = { + eventClient.remove(appId, Some(channelId)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala new file mode 100644 index 0000000..d675e55 --- /dev/null +++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase + +import org.apache.predictionio.data.storage._ +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.specs2._ +import org.specs2.specification.Step + +class PEventsSpec extends Specification with TestEvents { + + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + val sc = new SparkContext("local[4]", "PEventAggregatorSpec test") + + val appId = 1 + val channelId = 6 + val dbName = "test_pio_storage_events_" + hashCode + + def hbLocal = Storage.getDataObject[LEvents]( + StorageTestUtils.hbaseSourceName, + dbName + ) + + def hbPar = Storage.getDataObject[PEvents]( + StorageTestUtils.hbaseSourceName, + dbName + ) + + def stopSpark = { + sc.stop() + } + + def is = s2""" + + PredictionIO Storage PEvents Specification + + PEvents can be implemented by: + - HBPEvents ${hbPEvents} + - (stop Spark) ${Step(sc.stop())} + + """ + + def hbPEvents = sequential ^ s2""" + + HBPEvents should + - behave like any PEvents implementation ${events(hbLocal, hbPar)} + - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))} + + """ + + def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2""" + + - (init test) ${initTest(localEventClient)} + - (insert test events) ${insertTestEvents(localEventClient)} + find in default ${find(parEventClient)} + find in channel ${findChannel(parEventClient)} + aggregate user properties in default ${aggregateUserProperties(parEventClient)} + aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)} + write to default ${write(parEventClient)} + write to channel ${writeChannel(parEventClient)} + + """ + + /* setup */ + + // events from TestEvents trait + val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2) + val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4) + + def initTest(localEventClient: LEvents) = { + localEventClient.init(appId) + localEventClient.init(appId, Some(channelId)) + } + + def insertTestEvents(localEventClient: LEvents) = { + listOfEvents.map( localEventClient.insert(_, appId) ) + // insert to channel + listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) ) + success + } + + /* following are tests */ + + def find(parEventClient: PEvents) = { + val resultRDD: RDD[Event] = parEventClient.find( + appId = appId + )(sc) + + val results = resultRDD.collect.toList + .map {_.copy(eventId = None)} // ignore eventId + + results must containTheSameElementsAs(listOfEvents) + } + + def findChannel(parEventClient: PEvents) = { + val resultRDD: RDD[Event] = parEventClient.find( + appId = appId, + channelId = Some(channelId) + )(sc) + + val results = resultRDD.collect.toList + .map {_.copy(eventId = None)} // ignore eventId + + results must containTheSameElementsAs(listOfEventsChannel) + } + + def aggregateUserProperties(parEventClient: PEvents) = { + val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( + appId = appId, + entityType = "user" + )(sc) + val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap + + val expected = Map( + "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + result must beEqualTo(expected) + } + + def aggregateUserPropertiesChannel(parEventClient: PEvents) = { + val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( + appId = appId, + channelId = Some(channelId), + entityType = "user" + )(sc) + val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap + + val expected = Map( + "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime) + ) + + result must beEqualTo(expected) + } + + def write(parEventClient: PEvents) = { + val written = List(r5, r6) + val writtenRDD = sc.parallelize(written) + parEventClient.write(writtenRDD, appId)(sc) + + // read back + val resultRDD = parEventClient.find( + appId = appId + )(sc) + + val results = resultRDD.collect.toList + .map { _.copy(eventId = None)} // ignore eventId + + val expected = listOfEvents ++ written + + results must containTheSameElementsAs(expected) + } + + def writeChannel(parEventClient: PEvents) = { + val written = List(r1, r5, r6) + val writtenRDD = sc.parallelize(written) + parEventClient.write(writtenRDD, appId, Some(channelId))(sc) + + // read back + val resultRDD = parEventClient.find( + appId = appId, + channelId = Some(channelId) + )(sc) + + val results = resultRDD.collect.toList + .map { _.copy(eventId = None)} // ignore eventId + + val expected = listOfEventsChannel ++ written + + results must containTheSameElementsAs(expected) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala new file mode 100644 index 0000000..161cf90 --- /dev/null +++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase + +import org.apache.predictionio.data.storage.{LEvents, Storage} + +object StorageTestUtils { + val hbaseSourceName = "HBASE" + + def dropHBaseNamespace(namespace: String): Unit = { + val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace) + .asInstanceOf[HBLEvents] + val admin = eventDb.client.admin + val tableNames = admin.listTableNamesByNamespace(namespace) + tableNames.foreach { name => + admin.disableTable(name) + admin.deleteTable(name) + } + + // Only empty namespaces (no tables) can be removed. + admin.deleteNamespace(namespace) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala new file mode 100644 index 0000000..2171864 --- /dev/null +++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase + +import org.apache.predictionio.data.storage.{DataMap, Event} +import org.joda.time.{DateTime, DateTimeZone} + +trait TestEvents { + + val u1BaseTime = new DateTime(654321) + val u2BaseTime = new DateTime(6543210) + val u3BaseTime = new DateTime(6543410) + + // u1 events + val u1e1 = Event( + event = "$set", + entityType = "user", + entityId = "u1", + properties = DataMap( + """{ + "a" : 1, + "b" : "value2", + "d" : [1, 2, 3], + }"""), + eventTime = u1BaseTime + ) + + val u1e2 = u1e1.copy( + event = "$set", + properties = DataMap("""{"a" : 2}"""), + eventTime = u1BaseTime.plusDays(1) + ) + + val u1e3 = u1e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value4"}"""), + eventTime = u1BaseTime.plusDays(2) + ) + + val u1e4 = u1e1.copy( + event = "$unset", + properties = DataMap("""{"b" : null}"""), + eventTime = u1BaseTime.plusDays(3) + ) + + val u1e5 = u1e1.copy( + event = "$set", + properties = DataMap("""{"e" : "new"}"""), + eventTime = u1BaseTime.plusDays(4) + ) + + val u1LastTime = u1BaseTime.plusDays(4) + val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}""" + + // delete event for u1 + val u1ed = u1e1.copy( + event = "$delete", + properties = DataMap(), + eventTime = u1BaseTime.plusDays(5) + ) + + // u2 events + val u2e1 = Event( + event = "$set", + entityType = "user", + entityId = "u2", + properties = DataMap( + """{ + "a" : 21, + "b" : "value12", + "d" : [7, 5, 6], + }"""), + eventTime = u2BaseTime + ) + + val u2e2 = u2e1.copy( + event = "$unset", + properties = DataMap("""{"a" : null}"""), + eventTime = u2BaseTime.plusDays(1) + ) + + val u2e3 = u2e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value9", "g": "new11"}"""), + eventTime = u2BaseTime.plusDays(2) + ) + + val u2LastTime = u2BaseTime.plusDays(2) + val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}""" + + // u3 events + val u3e1 = Event( + event = "$set", + entityType = "user", + entityId = "u3", + properties = DataMap( + """{ + "a" : 22, + "b" : "value13", + "d" : [5, 6, 1], + }"""), + eventTime = u3BaseTime + ) + + val u3e2 = u3e1.copy( + event = "$unset", + properties = DataMap("""{"a" : null}"""), + eventTime = u3BaseTime.plusDays(1) + ) + + val u3e3 = u3e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""), + eventTime = u3BaseTime.plusDays(2) + ) + + val u3LastTime = u3BaseTime.plusDays(2) + val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}""" + + // some random events + val r1 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now, + prId = Some("my_prid") + ) + val r2 = Event( + event = "my_event2", + entityType = "my_entity_type2", + entityId = "my_entity_id2" + ) + val r3 = Event( + event = "my_event3", + entityType = "my_entity_type", + entityId = "my_entity_id", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "propA" : 1.2345, + "propB" : "valueB", + }""" + ), + prId = Some("my_prid") + ) + val r4 = Event( + event = "my_event4", + entityType = "my_entity_type4", + entityId = "my_entity_id4", + targetEntityType = Some("my_target_entity_type4"), + targetEntityId = Some("my_target_entity_id4"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }"""), + eventTime = DateTime.now + ) + val r5 = Event( + event = "my_event5", + entityType = "my_entity_type5", + entityId = "my_entity_id5", + targetEntityType = Some("my_target_entity_type5"), + targetEntityId = Some("my_target_entity_id5"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now + ) + val r6 = Event( + event = "my_event6", + entityType = "my_entity_type6", + entityId = "my_entity_id6", + targetEntityType = Some("my_target_entity_type6"), + targetEntityId = Some("my_target_entity_id6"), + properties = DataMap( + """{ + "prop1" : 6, + "prop2" : "value2", + "prop3" : [6, 7, 8], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now + ) + + // timezone + val tz1 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id0", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")), + prId = Some("my_prid") + ) + + val tz2 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id1", + eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")), + prId = Some("my_prid") + ) + + val tz3 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id2", + eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")), + prId = Some("my_prid") + ) + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala new file mode 100644 index 0000000..d723d07 --- /dev/null +++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import org.apache.predictionio.data.storage.{Event, LEvents, PropertyMap, Storage} +import org.specs2._ +import org.specs2.specification.Step + +class LEventsSpec extends Specification with TestEvents { + def is = s2""" + + PredictionIO Storage LEvents Specification + + Events can be implemented by: + - JDBCLEvents ${jdbcLEvents} + + """ + + def jdbcLEvents = sequential ^ s2""" + + JDBCLEvents should + - behave like any LEvents implementation ${events(jdbcDO)} + + """ + + val appId = 1 + + def events(eventClient: LEvents) = sequential ^ s2""" + + init default ${initDefault(eventClient)} + insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)} + insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)} + insert and delete by ID ${insertAndDelete(eventClient)} + insert test user events ${insertTestUserEvents(eventClient)} + find user events ${findUserEvents(eventClient)} + aggregate user properties ${aggregateUserProperties(eventClient)} + aggregate one user properties ${aggregateOneUserProperties(eventClient)} + aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)} + init channel ${initChannel(eventClient)} + insert 2 events to channel ${insertChannel(eventClient)} + insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)} + find events from channel ${findChannel(eventClient)} + remove default ${removeDefault(eventClient)} + remove channel ${removeChannel(eventClient)} + + """ + + val dbName = "test_pio_storage_events_" + hashCode + + def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName) + + def initDefault(eventClient: LEvents) = { + eventClient.init(appId) + } + + def insertAndGetEvents(eventClient: LEvents) = { + + // events from TestEvents trait + val listOfEvents = List(r1,r2,r3) + + val insertResp = listOfEvents.map { eventClient.insert(_, appId) } + + val insertedEventId: List[String] = insertResp + + val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) + .map { case (e, id) => Some(e.copy(eventId = Some(id))) } + + val getResp = insertedEventId.map { id => eventClient.get(id, appId) } + + val getEvents = getResp + + insertedEvent must containTheSameElementsAs(getEvents) + } + + def insertAndGetTimezone(eventClient: LEvents) = { + val listOfEvents = List(tz1, tz2, tz3) + + val insertResp = listOfEvents.map { eventClient.insert(_, appId) } + + val insertedEventId: List[String] = insertResp + + val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) + .map { case (e, id) => Some(e.copy(eventId = Some(id))) } + + val getResp = insertedEventId.map { id => eventClient.get(id, appId) } + + val getEvents = getResp + + insertedEvent must containTheSameElementsAs(getEvents) + } + + def insertAndDelete(eventClient: LEvents) = { + val eventId = eventClient.insert(r2, appId) + + val resultBefore = eventClient.get(eventId, appId) + + val expectedBefore = r2.copy(eventId = Some(eventId)) + + val deleteStatus = eventClient.delete(eventId, appId) + + val resultAfter = eventClient.get(eventId, appId) + + (resultBefore must beEqualTo(Some(expectedBefore))) and + (deleteStatus must beEqualTo(true)) and + (resultAfter must beEqualTo(None)) + } + + def insertTestUserEvents(eventClient: LEvents) = { + // events from TestEvents trait + val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + + listOfEvents.map{ eventClient.insert(_, appId) } + + success + } + + def findUserEvents(eventClient: LEvents) = { + + val results: List[Event] = eventClient.find( + appId = appId, + entityType = Some("user")) + .toList + .map(e => e.copy(eventId = None)) // ignore eventID + + // same events in insertTestUserEvents + val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + + results must containTheSameElementsAs(expected) + } + + def aggregateUserProperties(eventClient: LEvents) = { + + val result: Map[String, PropertyMap] = eventClient.aggregateProperties( + appId = appId, + entityType = "user") + + val expected = Map( + "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + result must beEqualTo(expected) + } + + def aggregateOneUserProperties(eventClient: LEvents) = { + val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( + appId = appId, + entityType = "user", + entityId = "u1") + + val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime)) + + result must beEqualTo(expected) + } + + def aggregateNonExistentUserProperties(eventClient: LEvents) = { + val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( + appId = appId, + entityType = "user", + entityId = "u999999") + + result must beEqualTo(None) + } + + val channelId = 12 + + def initChannel(eventClient: LEvents) = { + eventClient.init(appId, Some(channelId)) + } + + def insertChannel(eventClient: LEvents) = { + + // events from TestEvents trait + val listOfEvents = List(r4,r5) + + listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) ) + + success + } + + def insertAndDeleteChannel(eventClient: LEvents) = { + + val eventId = eventClient.insert(r2, appId, Some(channelId)) + + val resultBefore = eventClient.get(eventId, appId, Some(channelId)) + + val expectedBefore = r2.copy(eventId = Some(eventId)) + + val deleteStatus = eventClient.delete(eventId, appId, Some(channelId)) + + val resultAfter = eventClient.get(eventId, appId, Some(channelId)) + + (resultBefore must beEqualTo(Some(expectedBefore))) and + (deleteStatus must beEqualTo(true)) and + (resultAfter must beEqualTo(None)) + } + + def findChannel(eventClient: LEvents) = { + + val results: List[Event] = eventClient.find( + appId = appId, + channelId = Some(channelId) + ) + .toList + .map(e => e.copy(eventId = None)) // ignore eventId + + // same events in insertChannel + val expected = List(r4, r5) + + results must containTheSameElementsAs(expected) + } + + def removeDefault(eventClient: LEvents) = { + eventClient.remove(appId) + } + + def removeChannel(eventClient: LEvents) = { + eventClient.remove(appId, Some(channelId)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala new file mode 100644 index 0000000..71ebf5f --- /dev/null +++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import org.apache.predictionio.data.storage._ +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.specs2._ +import org.specs2.specification.Step + +class PEventsSpec extends Specification with TestEvents { + + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + val sc = new SparkContext("local[4]", "PEventAggregatorSpec test") + + val appId = 1 + val channelId = 6 + val dbName = "test_pio_storage_events_" + hashCode + + def jdbcLocal = Storage.getDataObject[LEvents]( + StorageTestUtils.jdbcSourceName, + dbName + ) + + def jdbcPar = Storage.getDataObject[PEvents]( + StorageTestUtils.jdbcSourceName, + dbName + ) + + def stopSpark = { + sc.stop() + } + + def is = s2""" + + PredictionIO Storage PEvents Specification + + PEvents can be implemented by: + - JDBCPEvents ${jdbcPEvents} + - (stop Spark) ${Step(sc.stop())} + + """ + + def jdbcPEvents = sequential ^ s2""" + + JDBCPEvents should + - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)} + - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))} + - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))} + + """ + + def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2""" + + - (init test) ${initTest(localEventClient)} + - (insert test events) ${insertTestEvents(localEventClient)} + find in default ${find(parEventClient)} + find in channel ${findChannel(parEventClient)} + aggregate user properties in default ${aggregateUserProperties(parEventClient)} + aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)} + write to default ${write(parEventClient)} + write to channel ${writeChannel(parEventClient)} + + """ + + /* setup */ + + // events from TestEvents trait + val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2) + val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4) + + def initTest(localEventClient: LEvents) = { + localEventClient.init(appId) + localEventClient.init(appId, Some(channelId)) + } + + def insertTestEvents(localEventClient: LEvents) = { + listOfEvents.map( localEventClient.insert(_, appId) ) + // insert to channel + listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) ) + success + } + + /* following are tests */ + + def find(parEventClient: PEvents) = { + val resultRDD: RDD[Event] = parEventClient.find( + appId = appId + )(sc) + + val results = resultRDD.collect.toList + .map {_.copy(eventId = None)} // ignore eventId + + results must containTheSameElementsAs(listOfEvents) + } + + def findChannel(parEventClient: PEvents) = { + val resultRDD: RDD[Event] = parEventClient.find( + appId = appId, + channelId = Some(channelId) + )(sc) + + val results = resultRDD.collect.toList + .map {_.copy(eventId = None)} // ignore eventId + + results must containTheSameElementsAs(listOfEventsChannel) + } + + def aggregateUserProperties(parEventClient: PEvents) = { + val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( + appId = appId, + entityType = "user" + )(sc) + val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap + + val expected = Map( + "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + result must beEqualTo(expected) + } + + def aggregateUserPropertiesChannel(parEventClient: PEvents) = { + val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( + appId = appId, + channelId = Some(channelId), + entityType = "user" + )(sc) + val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap + + val expected = Map( + "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime) + ) + + result must beEqualTo(expected) + } + + def write(parEventClient: PEvents) = { + val written = List(r5, r6) + val writtenRDD = sc.parallelize(written) + parEventClient.write(writtenRDD, appId)(sc) + + // read back + val resultRDD = parEventClient.find( + appId = appId + )(sc) + + val results = resultRDD.collect.toList + .map { _.copy(eventId = None)} // ignore eventId + + val expected = listOfEvents ++ written + + results must containTheSameElementsAs(expected) + } + + def writeChannel(parEventClient: PEvents) = { + val written = List(r1, r5, r6) + val writtenRDD = sc.parallelize(written) + parEventClient.write(writtenRDD, appId, Some(channelId))(sc) + + // read back + val resultRDD = parEventClient.find( + appId = appId, + channelId = Some(channelId) + )(sc) + + val results = resultRDD.collect.toList + .map { _.copy(eventId = None)} // ignore eventId + + val expected = listOfEventsChannel ++ written + + results must containTheSameElementsAs(expected) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala new file mode 100644 index 0000000..4bf90cf --- /dev/null +++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import scalikejdbc._ + +object StorageTestUtils { + val jdbcSourceName = "PGSQL" + + def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s => + SQL(s"drop table $table").execute().apply() + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala new file mode 100644 index 0000000..2cb08e5 --- /dev/null +++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import org.apache.predictionio.data.storage.{DataMap, Event} +import org.joda.time.{DateTime, DateTimeZone} + +trait TestEvents { + + val u1BaseTime = new DateTime(654321) + val u2BaseTime = new DateTime(6543210) + val u3BaseTime = new DateTime(6543410) + + // u1 events + val u1e1 = Event( + event = "$set", + entityType = "user", + entityId = "u1", + properties = DataMap( + """{ + "a" : 1, + "b" : "value2", + "d" : [1, 2, 3], + }"""), + eventTime = u1BaseTime + ) + + val u1e2 = u1e1.copy( + event = "$set", + properties = DataMap("""{"a" : 2}"""), + eventTime = u1BaseTime.plusDays(1) + ) + + val u1e3 = u1e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value4"}"""), + eventTime = u1BaseTime.plusDays(2) + ) + + val u1e4 = u1e1.copy( + event = "$unset", + properties = DataMap("""{"b" : null}"""), + eventTime = u1BaseTime.plusDays(3) + ) + + val u1e5 = u1e1.copy( + event = "$set", + properties = DataMap("""{"e" : "new"}"""), + eventTime = u1BaseTime.plusDays(4) + ) + + val u1LastTime = u1BaseTime.plusDays(4) + val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}""" + + // delete event for u1 + val u1ed = u1e1.copy( + event = "$delete", + properties = DataMap(), + eventTime = u1BaseTime.plusDays(5) + ) + + // u2 events + val u2e1 = Event( + event = "$set", + entityType = "user", + entityId = "u2", + properties = DataMap( + """{ + "a" : 21, + "b" : "value12", + "d" : [7, 5, 6], + }"""), + eventTime = u2BaseTime + ) + + val u2e2 = u2e1.copy( + event = "$unset", + properties = DataMap("""{"a" : null}"""), + eventTime = u2BaseTime.plusDays(1) + ) + + val u2e3 = u2e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value9", "g": "new11"}"""), + eventTime = u2BaseTime.plusDays(2) + ) + + val u2LastTime = u2BaseTime.plusDays(2) + val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}""" + + // u3 events + val u3e1 = Event( + event = "$set", + entityType = "user", + entityId = "u3", + properties = DataMap( + """{ + "a" : 22, + "b" : "value13", + "d" : [5, 6, 1], + }"""), + eventTime = u3BaseTime + ) + + val u3e2 = u3e1.copy( + event = "$unset", + properties = DataMap("""{"a" : null}"""), + eventTime = u3BaseTime.plusDays(1) + ) + + val u3e3 = u3e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""), + eventTime = u3BaseTime.plusDays(2) + ) + + val u3LastTime = u3BaseTime.plusDays(2) + val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}""" + + // some random events + val r1 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now, + prId = Some("my_prid") + ) + val r2 = Event( + event = "my_event2", + entityType = "my_entity_type2", + entityId = "my_entity_id2" + ) + val r3 = Event( + event = "my_event3", + entityType = "my_entity_type", + entityId = "my_entity_id", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "propA" : 1.2345, + "propB" : "valueB", + }""" + ), + prId = Some("my_prid") + ) + val r4 = Event( + event = "my_event4", + entityType = "my_entity_type4", + entityId = "my_entity_id4", + targetEntityType = Some("my_target_entity_type4"), + targetEntityId = Some("my_target_entity_id4"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }"""), + eventTime = DateTime.now + ) + val r5 = Event( + event = "my_event5", + entityType = "my_entity_type5", + entityId = "my_entity_id5", + targetEntityType = Some("my_target_entity_type5"), + targetEntityId = Some("my_target_entity_id5"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now + ) + val r6 = Event( + event = "my_event6", + entityType = "my_entity_type6", + entityId = "my_entity_id6", + targetEntityType = Some("my_target_entity_type6"), + targetEntityId = Some("my_target_entity_id6"), + properties = DataMap( + """{ + "prop1" : 6, + "prop2" : "value2", + "prop3" : [6, 7, 8], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now + ) + + // timezone + val tz1 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id0", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")), + prId = Some("my_prid") + ) + + val tz2 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id1", + eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")), + prId = Some("my_prid") + ) + + val tz3 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id2", + eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")), + prId = Some("my_prid") + ) + +}