predictionio-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From don...@apache.org
Subject predictionio git commit: [PIO-171] Hack in build.sbt for switching between Spark 1.x and 2.x should be cleaned up.
Date Thu, 04 Oct 2018 06:33:41 GMT
Repository: predictionio
Updated Branches:
  refs/heads/develop 0193c8b16 -> b3fba2eac


[PIO-171] Hack in build.sbt for switching between Spark 1.x and 2.x should be cleaned up.

Closes #477


Project: http://git-wip-us.apache.org/repos/asf/predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/predictionio/commit/b3fba2ea
Tree: http://git-wip-us.apache.org/repos/asf/predictionio/tree/b3fba2ea
Diff: http://git-wip-us.apache.org/repos/asf/predictionio/diff/b3fba2ea

Branch: refs/heads/develop
Commit: b3fba2eacd75e1b422a204067e5a76dcd5d7b17d
Parents: 0193c8b
Author: saurabh gulati <saurabh3091@gmail.com>
Authored: Wed Oct 3 23:32:27 2018 -0700
Committer: Donald Szeto <donald@apache.org>
Committed: Wed Oct 3 23:33:03 2018 -0700

----------------------------------------------------------------------
 build.sbt                                       |   2 -
 .../data/store/python/PPythonEventStore.scala   | 146 +++++++++++++++++++
 .../predictionio/data/view/DataView.scala       |   9 +-
 .../data/SparkVersionDependent.scala            |  30 ----
 .../data/store/python/PPythonEventStore.scala   | 146 -------------------
 .../data/storage/jdbc/JDBCPEvents.scala         |   8 +-
 .../tools/export/EventsToFile.scala             |   6 +-
 7 files changed, 153 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/predictionio/blob/b3fba2ea/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 9eea3e0..533fbef 100644
--- a/build.sbt
+++ b/build.sbt
@@ -129,8 +129,6 @@ val data = (project in file("data")).
   settings(commonSettings: _*).
   settings(commonTestSettings: _*).
   enablePlugins(GenJavadocPlugin).
-  settings(unmanagedSourceDirectories in Compile +=
-    sourceDirectory.value / s"main/spark-${majorVersion(sparkVersion.value)}").
   disablePlugins(sbtassembly.AssemblyPlugin)
 
 val core = (project in file("core")).

http://git-wip-us.apache.org/repos/asf/predictionio/blob/b3fba2ea/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala
b/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala
new file mode 100644
index 0000000..1d03634
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.predictionio.data.store.python
+
+import java.sql.Timestamp
+
+import org.apache.predictionio.data.store.PEventStore
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.joda.time.DateTime
+
+
+/** This object provides a set of operation to access Event Store
+  * with Spark's parallelization
+  */
+object PPythonEventStore {
+
+
+  /** Read events from Event Store
+    *
+    * @param appName          return events of this app
+    * @param channelName      return events of this channel (default channel if it's None)
+    * @param startTime        return events with eventTime >= startTime
+    * @param untilTime        return events with eventTime < untilTime
+    * @param entityType       return events of this entityType
+    * @param entityId         return events of this entityId
+    * @param eventNames       return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId   return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param spark            Spark context
+    * @return DataFrame
+    */
+  def find(
+            appName: String,
+            channelName: String,
+            startTime: Timestamp,
+            untilTime: Timestamp,
+            entityType: String,
+            entityId: String,
+            eventNames: Array[String],
+            targetEntityType: String,
+            targetEntityId: String
+          )(spark: SparkSession): DataFrame = {
+    import spark.implicits._
+    val colNames: Seq[String] =
+      Seq(
+        "eventId",
+        "event",
+        "entityType",
+        "entityId",
+        "targetEntityType",
+        "targetEntityId",
+        "eventTime",
+        "tags",
+        "prId",
+        "creationTime",
+        "fields"
+      )
+    PEventStore.find(appName,
+      Option(channelName),
+      Option(startTime).map(t => new DateTime(t.getTime)),
+      Option(untilTime).map(t => new DateTime(t.getTime)),
+      Option(entityType),
+      Option(entityId),
+      Option(eventNames),
+      Option(Option(targetEntityType)),
+      Option(Option(targetEntityId)))(spark.sparkContext).map { e =>
+      (
+        e.eventId,
+        e.event,
+        e.entityType,
+        e.entityId,
+        e.targetEntityType.orNull,
+        e.targetEntityId.orNull,
+        new Timestamp(e.eventTime.getMillis),
+        e.tags.mkString("\t"),
+        e.prId.orNull,
+        new Timestamp(e.creationTime.getMillis),
+        e.properties.fields.mapValues(_.values.toString)
+      )
+    }.toDF(colNames: _*)
+  }
+
+  /** Aggregate properties of entities based on these special events:
+    * \$set, \$unset, \$delete events.
+    *
+    * @param appName     use events of this app
+    * @param entityType  aggregate properties of the entities of this entityType
+    * @param channelName use events of this channel (default channel if it's None)
+    * @param startTime   use events with eventTime >= startTime
+    * @param untilTime   use events with eventTime < untilTime
+    * @param required    only keep entities with these required properties defined
+    * @param spark       Spark session
+    * @return DataFrame  DataFrame of entityId and PropetyMap pair
+    */
+  def aggregateProperties(
+                           appName: String,
+                           entityType: String,
+                           channelName: String,
+                           startTime: Timestamp,
+                           untilTime: Timestamp,
+                           required: Array[String]
+                         )
+                         (spark: SparkSession): DataFrame = {
+    import spark.implicits._
+    val colNames: Seq[String] =
+      Seq(
+        "entityId",
+        "firstUpdated",
+        "lastUpdated",
+        "fields"
+      )
+    PEventStore.aggregateProperties(appName,
+      entityType,
+      Option(channelName),
+      Option(startTime).map(t => new DateTime(t.getTime)),
+      Option(untilTime).map(t => new DateTime(t.getTime)),
+      Option(required.toSeq))(spark.sparkContext).map { x =>
+      val m = x._2
+      (x._1,
+        new Timestamp(m.firstUpdated.getMillis),
+        new Timestamp(m.lastUpdated.getMillis),
+        m.fields.mapValues(_.values.toString)
+      )
+    }.toDF(colNames: _*)
+  }
+}

http://git-wip-us.apache.org/repos/asf/predictionio/blob/b3fba2ea/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
index 1c47e10..ca92e8f 100644
--- a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala
@@ -20,14 +20,10 @@ package org.apache.predictionio.data.view
 
 import org.apache.predictionio.annotation.Experimental
 import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.SparkVersionDependent
-
 import grizzled.slf4j.Logger
 import org.apache.predictionio.data.store.PEventStore
-
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 import org.apache.spark.SparkContext
 import org.joda.time.DateTime
 
@@ -52,7 +48,6 @@ object DataView {
     * @param name identify the DataFrame created
     * @param version used to track changes to the conversionFunction, e.g. version = "20150413"
     *                and update whenever the function is changed.
-    * @param sqlContext SQL context
     * @tparam E the output type of the conversion function. The type needs to extend Product
     *           (e.g. case class)
     * @return a DataFrame of events
@@ -69,7 +64,7 @@ object DataView {
 
     @transient lazy val logger = Logger[this.type]
 
-    val sqlSession = SparkVersionDependent.sqlSession(sc)
+    val sqlSession = SparkSession.builder().getOrCreate()
 
     val beginTime = startTime match {
       case Some(t) => t

http://git-wip-us.apache.org/repos/asf/predictionio/blob/b3fba2ea/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala
----------------------------------------------------------------------
diff --git a/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala
b/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala
deleted file mode 100644
index 3d07bdf..0000000
--- a/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SparkSession
-
-object SparkVersionDependent {
-
-  def sqlSession(sc: SparkContext): SparkSession = {
-    SparkSession.builder().getOrCreate()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/predictionio/blob/b3fba2ea/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
b/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
deleted file mode 100644
index 1d03634..0000000
--- a/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.predictionio.data.store.python
-
-import java.sql.Timestamp
-
-import org.apache.predictionio.data.store.PEventStore
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.joda.time.DateTime
-
-
-/** This object provides a set of operation to access Event Store
-  * with Spark's parallelization
-  */
-object PPythonEventStore {
-
-
-  /** Read events from Event Store
-    *
-    * @param appName          return events of this app
-    * @param channelName      return events of this channel (default channel if it's None)
-    * @param startTime        return events with eventTime >= startTime
-    * @param untilTime        return events with eventTime < untilTime
-    * @param entityType       return events of this entityType
-    * @param entityId         return events of this entityId
-    * @param eventNames       return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId   return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param spark            Spark context
-    * @return DataFrame
-    */
-  def find(
-            appName: String,
-            channelName: String,
-            startTime: Timestamp,
-            untilTime: Timestamp,
-            entityType: String,
-            entityId: String,
-            eventNames: Array[String],
-            targetEntityType: String,
-            targetEntityId: String
-          )(spark: SparkSession): DataFrame = {
-    import spark.implicits._
-    val colNames: Seq[String] =
-      Seq(
-        "eventId",
-        "event",
-        "entityType",
-        "entityId",
-        "targetEntityType",
-        "targetEntityId",
-        "eventTime",
-        "tags",
-        "prId",
-        "creationTime",
-        "fields"
-      )
-    PEventStore.find(appName,
-      Option(channelName),
-      Option(startTime).map(t => new DateTime(t.getTime)),
-      Option(untilTime).map(t => new DateTime(t.getTime)),
-      Option(entityType),
-      Option(entityId),
-      Option(eventNames),
-      Option(Option(targetEntityType)),
-      Option(Option(targetEntityId)))(spark.sparkContext).map { e =>
-      (
-        e.eventId,
-        e.event,
-        e.entityType,
-        e.entityId,
-        e.targetEntityType.orNull,
-        e.targetEntityId.orNull,
-        new Timestamp(e.eventTime.getMillis),
-        e.tags.mkString("\t"),
-        e.prId.orNull,
-        new Timestamp(e.creationTime.getMillis),
-        e.properties.fields.mapValues(_.values.toString)
-      )
-    }.toDF(colNames: _*)
-  }
-
-  /** Aggregate properties of entities based on these special events:
-    * \$set, \$unset, \$delete events.
-    *
-    * @param appName     use events of this app
-    * @param entityType  aggregate properties of the entities of this entityType
-    * @param channelName use events of this channel (default channel if it's None)
-    * @param startTime   use events with eventTime >= startTime
-    * @param untilTime   use events with eventTime < untilTime
-    * @param required    only keep entities with these required properties defined
-    * @param spark       Spark session
-    * @return DataFrame  DataFrame of entityId and PropetyMap pair
-    */
-  def aggregateProperties(
-                           appName: String,
-                           entityType: String,
-                           channelName: String,
-                           startTime: Timestamp,
-                           untilTime: Timestamp,
-                           required: Array[String]
-                         )
-                         (spark: SparkSession): DataFrame = {
-    import spark.implicits._
-    val colNames: Seq[String] =
-      Seq(
-        "entityId",
-        "firstUpdated",
-        "lastUpdated",
-        "fields"
-      )
-    PEventStore.aggregateProperties(appName,
-      entityType,
-      Option(channelName),
-      Option(startTime).map(t => new DateTime(t.getTime)),
-      Option(untilTime).map(t => new DateTime(t.getTime)),
-      Option(required.toSeq))(spark.sparkContext).map { x =>
-      val m = x._2
-      (x._1,
-        new Timestamp(m.firstUpdated.getMillis),
-        new Timestamp(m.lastUpdated.getMillis),
-        m.fields.mapValues(_.values.toString)
-      )
-    }.toDF(colNames: _*)
-  }
-}

http://git-wip-us.apache.org/repos/asf/predictionio/blob/b3fba2ea/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
index d31e592..4fa8b9f 100644
--- a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
+++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
@@ -20,12 +20,10 @@ package org.apache.predictionio.data.storage.jdbc
 import java.sql.{DriverManager, ResultSet}
 
 import com.github.nscala_time.time.Imports._
-import org.apache.predictionio.data.storage.{
-  DataMap, Event, PEvents, StorageClientConfig}
-import org.apache.predictionio.data.SparkVersionDependent
+import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.{JdbcRDD, RDD}
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.json4s.JObject
 import org.json4s.native.Serialization
 import scalikejdbc._
@@ -121,7 +119,7 @@ class JDBCPEvents(client: String, config: StorageClientConfig, namespace:
String
   }
 
   def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit
= {
-    val sqlSession = SparkVersionDependent.sqlSession(sc)
+    val sqlSession = SparkSession.builder().getOrCreate()
     import sqlSession.implicits._
 
     val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)

http://git-wip-us.apache.org/repos/asf/predictionio/blob/b3fba2ea/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
index 0372a44..9b6dbb5 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
@@ -21,14 +21,12 @@ package org.apache.predictionio.tools.export
 import org.apache.predictionio.controller.Utils
 import org.apache.predictionio.data.storage.EventJson4sSupport
 import org.apache.predictionio.data.storage.Storage
-import org.apache.predictionio.data.SparkVersionDependent
 import org.apache.predictionio.tools.Runner
 import org.apache.predictionio.workflow.WorkflowContext
 import org.apache.predictionio.workflow.WorkflowUtils
 import org.apache.predictionio.workflow.CleanupFunctions
-
 import grizzled.slf4j.Logging
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.json4s.native.Serialization._
 
 case class EventsToFileArgs(
@@ -93,7 +91,7 @@ object EventsToFile extends Logging {
           mode = "Export",
           batch = "App ID " + args.appId + channelStr,
           executorEnv = Runner.envStringToMap(args.env))
-        val sqlSession = SparkVersionDependent.sqlSession(sc)
+        val sqlSession = SparkSession.builder().getOrCreate()
         val events = Storage.getPEvents()
         val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
         val jsonStringRdd = eventsRdd.map(write(_))


Mime
View raw message