predictionio-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shane Johnson <shanewaldenjohn...@gmail.com>
Subject Re: Extracting Strings from JString with PEventStore.aggregateProperties
Date Tue, 03 Oct 2017 23:23:40 GMT
Hi Team,

I spent all day learning about json4s and the solution ended up being very
simple:) I am posting below in case it is helpful for others using the
aggregateProperties. I was calling the *fields object* when I should not
have. It was putting me one layer too deep based on how the get function
was built. Here is how I solved my issue.


val fieldsRDD = PEventStore.aggregateProperties(
      appName = "classi",
      entityType = "Opportunity")(sc).repartition(sc.defaultParallelism)

val oppProperties = fieldsRDD.map {
      case (oppId, propMap) => propMap//.fields (I removed fields)
    }


val opportunity: RDD[Opportunity] = oppProperties
    .map { row =>

        new Opportunity( //I added the type as a parameter to the get
function
            OpportunityId = row.get[String]("OpportunityId"),
            ContactId = row.get[String]("ContactId"),
            Amount = row.get[Double]("Amount"),
            OwnerId = row.get[String]("OwnerId"),
            ForecastCategoryName = row.get[String]("ForecastCategoryName"),
            AccountId = row.get[String]("AccountId")
        )
    }

Output:


Opportunity(006J000000RNvVYIA1,,75000.0,005J0000006a1Y1IAI,Closed,001
J000001wndthIAA)
Opportunity(006J000000SQopgIAD,,14947.0,005J0000006a1Y1IAI,Closed,001
J000001xjXi0IAE)


*Shane Johnson | 801.360.3350*
LinkedIn <https://www.linkedin.com/in/shanewjohnson> | Facebook
<https://www.facebook.com/shane.johnson.71653>

On Tue, Oct 3, 2017 at 4:03 PM, Shane Johnson <shanewaldenjohnson@gmail.com>
wrote:

> Hi Team,
>
> I have been working with with RDDs being generated by
> org.apache.predictionio.data.storage.Event
>
> and have had success creating case classes and converting the RDD to a
> simplified format that can convert to a dataframe. I am trying to do this
> same thing with
> org.apache.predictionio.data.storage.PropertyMap
>
> but am not having success. Can someone point me in the right direction?
>
>
> *Here is what is working well - end goal is to model properties into a
> flat RDD and convert to dataframe
> (org.apache.predictionio.data.storage.Event):*
>
> *______________________________________________________________________________*
>
> 1 - First I create these
>
> opp: org.apache.spark.rdd.RDD[(String, org.apache.predictionio.data.storage.Event)]
> = MapPartitionsRDD[39] at map at <console>:157
> close: org.apache.spark.rdd.RDD[(String, org.apache.predictionio.data.storage.Event)]
> = MapPartitionsRDD[41] at map at <console>:157
>
> 2 - Then Join the RDD and create a map to create a flat dataset that will
> work well with my random forest model. I also like being able to convert it
> to a dataframe with one command. This is what I am trying to accomplish
> with the properties within the PropertyMap from set events.
>
> val opportunity: RDD[Opportunity] = opp.cogroup(close)
>       .map { case (oppId, (oppsIter, closeIter)) =>
>         // the first view event of the session is the landing event
>         val oppProgress = oppsIter.reduce{ (a, b) =>
>           if (a.eventTime.isBefore(b.eventTime)) a else b
>         }
>         // any buy after landing
>         val closeWon = closeIter.filter( b => b.eventTime.isAfter(
> oppProgress.eventTime))
>           .nonEmpty
>
>         new Opportunity(
>         naicsCode = ((oppProgress.properties.getOrElse[Int]("NaicsCode", 0
> ).toString).take(3))+"000",
>         billingCountry = oppProgress.properties.getOrElse[String]("BillingCountry",
> ""),
>         billingState = oppProgress.properties.getOrElse[String]("BillingState",
> ""),
>         numberOfEmployees = employeeBand(oppProgress.properties.getOrElse[
> Int]("NumberOfEmployees", 0)),
>         annualRevenue = revenueBand(oppProgress.properties.getOrElse[Int](
> "AnnualRevenue", 0)),
>         close = closeWon
>         )
>
>       }
> opportunity.take(2).foreach(println)
>
> val opportunitydf = opportunity.toDF().show()
>
> res:
>
> opportunity: org.apache.spark.rdd.RDD[Opportunity] = MapPartitionsRDD[59]
> at map at <console>:161
> Opportunity(441000,United States,Virginia,10000Emp,$1B,false)
> Opportunity(522000,United States,Virginia,10000Emp,$0,false)
> +---------+--------------+------------+-----------------+---
> ----------+-----+
> |naicsCode|billingCountry|billingState|numberOfEmployees|annualRevenue|
> close|
> +---------+--------------+------------+-----------------+---
> ----------+-----+
> | 441000| United States| Virginia| 10000Emp| $1B|false|
> | 522000| United States| Virginia| 10000Emp| $0|false|
> | 326000| United States| Virginia| 5000Emp| $0|false|
> | 722000| United States| Maryland| 100Emp| $10M|false|
>
>
> *Here is where I am stuck. I am trying to get similar output as above but
> am not able to extract the fields from the map nor convert the values from
> JString, JDouble, etc into simple Strings and Values in an RDD nor convert
> the RDD to a dataframe.*
>
> *______________________________________________________________________________*
>
> 1 - The first step I take is to aggregate the $set events by
> calling PEventStore.aggregateProperties and then pull out the Map. I am
> not interested in the entityId or the first and last time.
>
>
> val fieldsRDD = PEventStore.aggregateProperties(
>       appName = "classi",
>       entityType = "Opportunity")(sc).repartition(sc.defaultParallelism)
>
> val oppProperties = fieldsRDD.map {
>       case (oppId, propMap) => propMap.fields
>     }
>
> 2 - Then I create a case class and attempt to map over the oppProperties
> RDD and create the flattened RDD in preparation to convert it to a data
> frame.* This is where I am stuck.*
>
>
> case class Opportunity(
>   OpportunityId: String,
>   ContactId: String,
>   Amount: Double,
>   OwnerId: String,
>   ForecastCategoryName: String,
>   AccountId: String
> ) extends Serializable
>
> val opportunity: RDD[Opportunity] = oppProperties
>     .map { row =>
>
>         new Opportunity(
>             OpportunityId = row.get("OpportunityId"),
>             ContactId = row.get("ContactId"),
>             Amount = row.get("Amount"),
>             OwnerId = row.get("OwnerId"),
>             ForecastCategoryName = row.get("ForecastCategoryName"),
>             AccountId = row.get("AccountId")
>         )
>     }
>
> opportunity.take(2).foreach(println)
>
> Error:
>
> type mismatch;
> found : Option[org.json4s.JValue]
> (which expands to) Option[org.json4s.JsonAST.JValue]
> required: String
> OpportunityId = row.get("OpportunityId"),
> ^
>
> If I set everything in the case class to `Any` I can get one step further
> but I am assuming I need to write the *get* function differently to
> convert it from *i.e. Some(JString("Text")) to just the Text*. If I set
> the types in the case class *Opportunity* to *Any* I get the following
> Undesired RDD output. However I need the values to be converted. I would
> like the RDD to look like the Desired RDD below.
>
> Undesired RDD:
> Opportunity(Some(JString(006J000000RNvVYIA1)),Some(JString()),Some(JDouble
> (75000.0)),Some(JString(005J0000006a1Y1IAI)),Some(JString(Closed)),Some(
> JString(001J000001wndthIAA)))
> Opportunity(Some(JString(006J000000SQopgIAD)),Some(JString()),Some(JDouble
> (14947.0)),Some(JString(005J0000006a1Y1IAI)),Some(JString(Closed)),Some(
> JString(001J000001xjXi0IAE)))
>
> Desired RDD:
> Opportunity(006J000000RNvVYIA1,,75000.0,005J0000006a1Y1IAI,Closed,001
> J000001wndthIAA)
> Opportunity(006J000000SQopgIAD,,14947.0,005J0000006a1Y1IAI,,001
> J000001xjXi0IAE)
>
> Thank you in advance for any advise or direction you can provide.
>
> Best,
>
> Shane
>
> Shane Johnson | 801.360.3350 <(801)%20360-3350>
> LinkedIn <https://www.linkedin.com/in/shanewjohnson> | Facebook
> <https://www.facebook.com/shane.johnson.71653>
>
>
>

Mime
View raw message