predictionio-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Donald Szeto <don...@apache.org>
Subject Re: Extracting Strings from JString with PEventStore.aggregateProperties
Date Wed, 04 Oct 2017 04:57:22 GMT
Hi Shane,

Thanks for your finding! Perhaps we should improve the documentation or
even privatize the `fields` member.

The use of `get` is noted here:
https://github.com/apache/incubator-predictionio/blob/develop/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala#L29

Regards,
Donald

On Tue, Oct 3, 2017 at 4:23 PM, Shane Johnson <shanewaldenjohnson@gmail.com>
wrote:

> Hi Team,
>
> I spent all day learning about json4s and the solution ended up being very
> simple:) I am posting below in case it is helpful for others using the
> aggregateProperties. I was calling the *fields object* when I should not
> have. It was putting me one layer too deep based on how the get function
> was built. Here is how I solved my issue.
>
>
> val fieldsRDD = PEventStore.aggregateProperties(
>       appName = "classi",
>       entityType = "Opportunity")(sc).repartition(sc.defaultParallelism)
>
> val oppProperties = fieldsRDD.map {
>       case (oppId, propMap) => propMap//.fields (I removed fields)
>     }
>
>
> val opportunity: RDD[Opportunity] = oppProperties
>     .map { row =>
>
>         new Opportunity( //I added the type as a parameter to the get
> function
>             OpportunityId = row.get[String]("OpportunityId"),
>             ContactId = row.get[String]("ContactId"),
>             Amount = row.get[Double]("Amount"),
>             OwnerId = row.get[String]("OwnerId"),
>             ForecastCategoryName = row.get[String]("ForecastCategoryName"
> ),
>             AccountId = row.get[String]("AccountId")
>         )
>     }
>
> Output:
>
>
> Opportunity(006J000000RNvVYIA1,,75000.0,005J0000006a1Y1IAI,Closed,001
> J000001wndthIAA)
> Opportunity(006J000000SQopgIAD,,14947.0,005J0000006a1Y1IAI,Closed,001
> J000001xjXi0IAE)
>
>
> *Shane Johnson | 801.360.3350 <(801)%20360-3350>*
> LinkedIn <https://www.linkedin.com/in/shanewjohnson> | Facebook
> <https://www.facebook.com/shane.johnson.71653>
>
> On Tue, Oct 3, 2017 at 4:03 PM, Shane Johnson <
> shanewaldenjohnson@gmail.com> wrote:
>
>> Hi Team,
>>
>> I have been working with with RDDs being generated by
>> org.apache.predictionio.data.storage.Event
>>
>> and have had success creating case classes and converting the RDD to a
>> simplified format that can convert to a dataframe. I am trying to do this
>> same thing with
>> org.apache.predictionio.data.storage.PropertyMap
>>
>> but am not having success. Can someone point me in the right direction?
>>
>>
>> *Here is what is working well - end goal is to model properties into a
>> flat RDD and convert to dataframe
>> (org.apache.predictionio.data.storage.Event):*
>>
>> *______________________________________________________________________________*
>>
>> 1 - First I create these
>>
>> opp: org.apache.spark.rdd.RDD[(String, org.apache.predictionio.data.storage.Event)]
>> = MapPartitionsRDD[39] at map at <console>:157
>> close: org.apache.spark.rdd.RDD[(String, org.apache.predictionio.data.storage.Event)]
>> = MapPartitionsRDD[41] at map at <console>:157
>>
>> 2 - Then Join the RDD and create a map to create a flat dataset that will
>> work well with my random forest model. I also like being able to convert it
>> to a dataframe with one command. This is what I am trying to accomplish
>> with the properties within the PropertyMap from set events.
>>
>> val opportunity: RDD[Opportunity] = opp.cogroup(close)
>>       .map { case (oppId, (oppsIter, closeIter)) =>
>>         // the first view event of the session is the landing event
>>         val oppProgress = oppsIter.reduce{ (a, b) =>
>>           if (a.eventTime.isBefore(b.eventTime)) a else b
>>         }
>>         // any buy after landing
>>         val closeWon = closeIter.filter( b =>
>> b.eventTime.isAfter(oppProgress.eventTime))
>>           .nonEmpty
>>
>>         new Opportunity(
>>         naicsCode = ((oppProgress.properties.getOrElse[Int]("NaicsCode",
>> 0).toString).take(3))+"000",
>>         billingCountry = oppProgress.properties.getOrElse[String]("BillingCountry",
>> ""),
>>         billingState = oppProgress.properties.getOrElse[String]("BillingState",
>> ""),
>>         numberOfEmployees = employeeBand(oppProgress.prope
>> rties.getOrElse[Int]("NumberOfEmployees", 0)),
>>         annualRevenue = revenueBand(oppProgress.properties.getOrElse[Int
>> ]("AnnualRevenue", 0)),
>>         close = closeWon
>>         )
>>
>>       }
>> opportunity.take(2).foreach(println)
>>
>> val opportunitydf = opportunity.toDF().show()
>>
>> res:
>>
>> opportunity: org.apache.spark.rdd.RDD[Opportunity] = MapPartitionsRDD[59]
>> at map at <console>:161
>> Opportunity(441000,United States,Virginia,10000Emp,$1B,false)
>> Opportunity(522000,United States,Virginia,10000Emp,$0,false)
>> +---------+--------------+------------+-----------------+---
>> ----------+-----+
>> |naicsCode|billingCountry|billingState|numberOfEmployees|annualRevenue|
>> close|
>> +---------+--------------+------------+-----------------+---
>> ----------+-----+
>> | 441000| United States| Virginia| 10000Emp| $1B|false|
>> | 522000| United States| Virginia| 10000Emp| $0|false|
>> | 326000| United States| Virginia| 5000Emp| $0|false|
>> | 722000| United States| Maryland| 100Emp| $10M|false|
>>
>>
>> *Here is where I am stuck. I am trying to get similar output as above but
>> am not able to extract the fields from the map nor convert the values from
>> JString, JDouble, etc into simple Strings and Values in an RDD nor convert
>> the RDD to a dataframe.*
>>
>> *______________________________________________________________________________*
>>
>> 1 - The first step I take is to aggregate the $set events by
>> calling PEventStore.aggregateProperties and then pull out the Map. I am
>> not interested in the entityId or the first and last time.
>>
>>
>> val fieldsRDD = PEventStore.aggregateProperties(
>>       appName = "classi",
>>       entityType = "Opportunity")(sc).repartition(sc.defaultParallelism)
>>
>> val oppProperties = fieldsRDD.map {
>>       case (oppId, propMap) => propMap.fields
>>     }
>>
>> 2 - Then I create a case class and attempt to map over the oppProperties
>> RDD and create the flattened RDD in preparation to convert it to a data
>> frame.* This is where I am stuck.*
>>
>>
>> case class Opportunity(
>>   OpportunityId: String,
>>   ContactId: String,
>>   Amount: Double,
>>   OwnerId: String,
>>   ForecastCategoryName: String,
>>   AccountId: String
>> ) extends Serializable
>>
>> val opportunity: RDD[Opportunity] = oppProperties
>>     .map { row =>
>>
>>         new Opportunity(
>>             OpportunityId = row.get("OpportunityId"),
>>             ContactId = row.get("ContactId"),
>>             Amount = row.get("Amount"),
>>             OwnerId = row.get("OwnerId"),
>>             ForecastCategoryName = row.get("ForecastCategoryName"),
>>             AccountId = row.get("AccountId")
>>         )
>>     }
>>
>> opportunity.take(2).foreach(println)
>>
>> Error:
>>
>> type mismatch;
>> found : Option[org.json4s.JValue]
>> (which expands to) Option[org.json4s.JsonAST.JValue]
>> required: String
>> OpportunityId = row.get("OpportunityId"),
>> ^
>>
>> If I set everything in the case class to `Any` I can get one step further
>> but I am assuming I need to write the *get* function differently to
>> convert it from *i.e. Some(JString("Text")) to just the Text*. If I set
>> the types in the case class *Opportunity* to *Any* I get the following
>> Undesired RDD output. However I need the values to be converted. I would
>> like the RDD to look like the Desired RDD below.
>>
>> Undesired RDD:
>> Opportunity(Some(JString(006J000000RNvVYIA1)),Some(JString()),Some(
>> JDouble(75000.0)),Some(JString(005J0000006a1Y1IAI)),Some(JString(Closed
>> )),Some(JString(001J000001wndthIAA)))
>> Opportunity(Some(JString(006J000000SQopgIAD)),Some(JString()),Some(
>> JDouble(14947.0)),Some(JString(005J0000006a1Y1IAI)),Some(JString(Closed
>> )),Some(JString(001J000001xjXi0IAE)))
>>
>> Desired RDD:
>> Opportunity(006J000000RNvVYIA1,,75000.0,005J0000006a1Y1IAI,Closed,001
>> J000001wndthIAA)
>> Opportunity(006J000000SQopgIAD,,14947.0,005J0000006a1Y1IAI,,001
>> J000001xjXi0IAE)
>>
>> Thank you in advance for any advise or direction you can provide.
>>
>> Best,
>>
>> Shane
>>
>> Shane Johnson | 801.360.3350 <(801)%20360-3350>
>> LinkedIn <https://www.linkedin.com/in/shanewjohnson> | Facebook
>> <https://www.facebook.com/shane.johnson.71653>
>>
>>
>>
>

Mime
View raw message