spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bryan Jeffrey <bryan.jeff...@gmail.com>
Subject Re: Example of updateStateByKey with initial RDD?
Date Thu, 08 Oct 2015 12:18:46 GMT
Aniket,

Thank you for the example - but that's not quite what I'm looking for.
I've got a call to updateStateByKey that looks like the following:

dstream.map(x => (x.keyWithTime, x))
.updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole)

def updateCountsOfProcessGivenRole(a : Seq[CountsOfProcessGivenRole], b:
Option[CountsOfProcessGivenRole]) : Option[CountsOfProcessGivenRole] = {
    val currentTime = DateTime.now(DateTimeZone.UTC)
    if(a.isEmpty) {
      if (b.get.eventhourbin.plusDays(3).getMillis < currentTime.getMillis)
{
        None
      } else {
        b
      }
    } else { // a is not empty - b may or may not be defined
      val population = if(b.isDefined) b.get.Population else 0 + a.map(x =>
x.Population).sum
      val subpopulation = if(b.isDefined) b.get.Subpopulation else 0 +
a.map(x => x.Subpopulation).sum
      Some(CountsOfProcessGivenRole(a(0), population, subpopulation))
    }
  }

This works fine, however when I go to add an initial RDD, modifying the
'updateStateByKey' call to look like the following:

val initialProcessGivenRoleRdd: RDD[((String, String, DateTime),
CountsOfProcessGivenRole)] = iPrProcessGivenRole.map(x => (x.keyWithTime,
x))
dstream.map(x => (x.keyWithTime, x))
.updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole, new
HashPartitioner(3), initialProcessGivenRoleRdd)

I am getting an error -- 'missing arguments for method
updateCountsOfProcessGivenRole'. Looking at the method calls, the function
that is called for appears to be the same.  I was hoping an example might
shed some light on the issue.

Regards,

Bryan Jeffrey







On Thu, Oct 8, 2015 at 7:04 AM, Aniket Bhatnagar <aniket.bhatnagar@gmail.com
> wrote:

> Here is an example:
>
> val interval = 60 * 1000
> val counts = eventsStream.map(event => {
>   (event.timestamp - event.timestamp % interval, event)
> }).updateStateByKey[Long](updateFunc = (events: Seq[Event], prevStateOpt:
> Option[Long]) => {
>   val prevCount = prevStateOpt.getOrElse(0L)
>   val newCount = prevCount + events.size
>   Some(newCount)
> })
> counts.print()
>
> Hope it helps!
>
> Thanks,
> Aniket
>
> On Thu, Oct 8, 2015 at 4:29 PM Bryan <bryan.jeffrey@gmail.com> wrote:
>
>> Hello,
>>
>> Can anyone point me to a good example of updateStateByKey with an initial
>> RDD? I am seeing a compile time error when following the API.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>

Mime
View raw message