Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 65BE9200B8D for ; Fri, 23 Sep 2016 17:46:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 643E7160AD0; Fri, 23 Sep 2016 15:46:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 22701160ACF for ; Fri, 23 Sep 2016 17:46:37 +0200 (CEST) Received: (qmail 91965 invoked by uid 500); 23 Sep 2016 15:46:37 -0000 Mailing-List: contact user-help@predictionio.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@predictionio.incubator.apache.org Delivered-To: mailing list user@predictionio.incubator.apache.org Received: (qmail 91955 invoked by uid 99); 23 Sep 2016 15:46:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Sep 2016 15:46:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id CD158C0D72 for ; Fri, 23 Sep 2016 15:46:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id JRCJ1IyqMX0I for ; Fri, 23 Sep 2016 15:46:34 +0000 (UTC) Received: from mail-lf0-f52.google.com (mail-lf0-f52.google.com [209.85.215.52]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id C09725F36A for ; Fri, 23 Sep 2016 15:46:33 +0000 (UTC) Received: by mail-lf0-f52.google.com with SMTP id b71so70033229lfg.0 for ; Fri, 23 Sep 2016 08:46:33 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=3fQ2U8RaYlEQC81qtzzXhnNdgC9yMRU4M4cnntyPCQo=; b=FOgvnz1lXQ4wMCErYG/dROA5WoaK+MXpld31jw6qIIzlhoVaeOmibV/oicID2lk7C3 vhSp4Pc/Pta4ZByeaRDVqiSIel0omnXjhE6xrSOeCA+Wp9Fm6wP9maZUgaCjCXzQf0rw JFj7pF+FonXZw2f9X9t4XNmiHZrTTp2B8eHbOxNxpCMFwO6cRKSiak7lQ+ofcW1dxIqP oiY4YjrQ5km71XLOn+rtKamRYGATfnkLSM9qVIKAe9pSQy6pil27PaO2Ls/knXYFI5vu DCYaPiiwNnBr5KOxtFDoaySA81J/mzrOdNbB1HNnk/G/XnJ1vQScAcaGsMnCszG65qbt MNzA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=3fQ2U8RaYlEQC81qtzzXhnNdgC9yMRU4M4cnntyPCQo=; b=bD9gfVQ0TEpNhfHbc8C5ykDe26Ea0Bij62vvbRpbHL4uK3F4AjtdBThdKf6/KGRy4l +Xe3occE3IRhtDkTPSGPLg/u4MmjcYtFnDD5zKstbcXbnmtNZFKfGBXwJJ7TvSf/k2f3 e/fr4gsHwe6DGP1LrZyZ3pUqVrSQZbclhyObmh8gSFju/6cG+XSFKS/DoRMYcJkEOkVf 3tN5bcmxL9Ju86kgchAaY7xI8Rt1SsRD/FaHVyDLzaIUo3AvT9jXYyffIl95AiMpaD3W yp5m2E5QHwHT05mozbP192L7UxwCvPNfgFWjf/2lpiVMtTxLq2laGmdzhxt4i2BHH77K ukVg== X-Gm-Message-State: AE9vXwPPAHfIOl+7j9au9CJVst8Z92klv5WjwBtBbFtoMiAgF+EGtMinYmzj3jAO3envnd0AWv9cxkjQCE3Lkw== X-Received: by 10.46.9.215 with SMTP id 206mr3020977ljj.71.1474645592842; Fri, 23 Sep 2016 08:46:32 -0700 (PDT) MIME-Version: 1.0 Received: by 10.114.113.195 with HTTP; Fri, 23 Sep 2016 08:46:12 -0700 (PDT) In-Reply-To: References: From: Hasan Can Saral Date: Fri, 23 Sep 2016 18:46:12 +0300 Message-ID: Subject: Re: How to access Spark Context in predict? To: user@predictionio.incubator.apache.org Content-Type: multipart/alternative; boundary=001a114b18eabb24ee053d2eaede archived-at: Fri, 23 Sep 2016 15:46:39 -0000 --001a114b18eabb24ee053d2eaede Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Marcin! Thank you for your answer. I do only need SparkContext, but have no idea on: 1- How to retrieve it from PersitentModelLoader? 2- How do I access sc in predict method using the configuration below? class SomeModel() extends LocalFileSystemPersistentModel[SomeAlgorithmParam= s] { override def save(id: String, params: SomeAlgorithmParams, sc: SparkContext): Boolean =3D { false } } object SomeModel extends LocalFileSystemPersistentModelLoader[SomeAlgorithmParams, FraudModel] { override def apply(id: String, params: SomeAlgorithmParams, sc: Option[SparkContext]): SomeModel =3D { new SomeModel() // HERE I TRAIN AND RETURN THE TRAINED MODEL } } Thank you very much, I really appreciate it! Hasan On Thu, Sep 22, 2016 at 7:05 PM, Marcin Ziemi=C5=84ski = wrote: > Hi Hasan, > > I think that you problem comes from using deserialized RDD, which already > lost its connection with SparkContext. > Similar case could be found here: http://stackoverflow.com/ > questions/29567247/serializing-rdd > > If you only really need SparkContext you could probably use the one > provided to PersitentModelLoader, which would be implemented by your mode= l. > Alternatively you could also implement PersistentModel to return false > from save method. In this case your algorithm would be retrained on deplo= y, > what would also provide you with the instance of SparkContext. > > Regards, > Marcin > > > czw., 22.09.2016 o 13:34 u=C5=BCytkownik Hasan Can Saral < > hasancansaral@gmail.com> napisa=C5=82: > >> Hi! >> >> I am trying to query Event Server with PEventStore api in predict method >> to fetch events per entity to create my features. PEventStore needs sc, = and >> for this, I have: >> >> - Extended PAlgorithm >> - Extended LocalFileSystemPersistentModel and LocalFileSystemP >> ersistentModelLoader >> - Put a dummy emptyRDD into my model >> - Tried to access sc with model.dummyRDD.context to receive this error: >> >> org.apache.spark.SparkException: RDD transformations and actions can >> only be invoked by the driver, not inside of other transformations; for >> example, rdd1.map(x =3D> rdd2.values.count() * x) is invalid because the >> values transformation and count action cannot be performed inside of the >> rdd1.map transformation. For more information, see SPARK-5063. >> >> Just like this user got it here >> = in >> predictionio-user group. Any suggestions? >> >> Here's a more of my predict method: >> >> def predict(model: SomeModel, query: Query): PredictedResult =3D { >> >> def predict(model: SomeModel, query: Query): PredictedResult =3D { >> >> >> val appName =3D sys.env.getOrElse[String]("APP_NAME", ap.appName) >> >> var previousEvents =3D try { >> PEventStore.find( >> appName =3D appName, >> entityType =3D Some(ap.entityType), >> entityId =3D Some(query.entityId.getOrElse("")) >> )(model.dummyRDD.context).map(event =3D> { >> >> Try(new CustomEvent( >> Some(event.event), >> Some(event.entityType), >> Some(event.entityId), >> Some(event.eventTime), >> Some(event.creationTime), >> Some(new Properties( >> *...* >> )) >> )) >> }).filter(_.isSuccess).map(_.get) >> } catch { >> case e: Exception =3D> // fatal because of error, an empty query >> logger.error(s"Error when reading events: ${e}") >> throw e >> } >> >> ... >> >> } >> >> --=20 Hasan Can Saral hasancansaral@gmail.com --001a114b18eabb24ee053d2eaede Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Marcin!

Thank you for your ans= wer.

I do only need SparkContext, but have no idea on:
1- How to retrieve it from Persit= entModelLoader?
2-=C2=A0How do I access sc in predict method using the configuration below?=

class SomeModel() extends LocalFileSystemPersistentModel[SomeAlgori=
thmParams] {
overr= ide def save(id: String= , params: SomeAlgorithmParams, sc: SparkContext): Boolean =3D {
<= span style=3D"color:rgb(0,0,128);font-weight:bold">false
}
}

object SomeModel extends LocalFileSystemPersistentModelLoader[SomeAlgorithmParams, FraudModel] {
override def apply(id: String, params: S= omeAlgorithmParams, sc: Option[Spark= Context]): SomeModel =3D {
new SomeModel() // HERE I TRAIN AND RETURN THE TRAI= NED MODEL
}
}
Thank you very much, I real=
ly appreciate it!
Hasan

On Thu, Sep 22, 2016 at 7:05 PM,= Marcin Ziemi=C5=84ski <zieminm@gmail.com> wrote:
Hi H= asan,

I think that you problem comes from using deserialized R= DD, which already lost its connection with SparkContext.
Similar = case could be found here: http://stackoverflow.com/ques= tions/29567247/serializing-rdd

If you only really nee= d SparkContext you could probably use the one provided to PersitentModelLoa= der, which would be implemented by your model.
Alternatively you c= ould also implement PersistentModel to return false from save method. In th= is case your algorithm would be retrained on deploy, what would also provid= e you with the instance of SparkContext.

Regards,
Mar= cin


c= zw., 22.09.2016 o 13:34=C2=A0u=C5=BCytkownik Hasan Can Saral <hasancansaral@gmail.com<= /a>> napisa=C5=82:
Hi!

I am trying to query Event Server with PEventSto= re api in predict method to fetch events per entity to create my features. = PEventStore needs sc, and for this, I have:

- Exte= nded=C2=A0PAlgorithm
- Extended=C2=A0LocalFileSystemPersistentMo= del=C2=A0and=C2=A0L= ocalFileSystemPersistentModelLoader
- Put a dummy= emptyRDD into my model
- Tried to access sc with=C2=A0model.dumm= yRDD.context to receive this error:=C2=A0
<= br>
org.apache.spark.SparkException: RDD transformations= and actions can only be invoked by the driver, not inside of other transfo= rmations; for example, rdd1.map(x =3D> rdd2.values.count() * x) is inval= id because the values transformation and count action cannot be performed i= nside of the rdd1.map transformation. For more information, see SPARK-5063.=

Just like this user= got it here=C2=A0in predictionio-user group. = Any suggestions?

Here's a more of my predi= ct method:

def predict(=
model: SomeModel, query: Query): PredictedResult =3D {

def predict(model: SomeModel, query: Query):= PredictedResult =3D {

val appName =3D sys.env.getOrElse[String](&= quot;APP_NAME", ap.appName)

var previousEvents =3D try {
PEventSto= re.find(
appName =3D appName,
entityType =3D Some(ap.entityType),
enti= tyId =3D Some(query.entityId.getOr= Else(""<= /span>))
)(= model.dummyRDD.context).map(event =3D> {

Try(new CustomEvent(
Some(event.event),
Some(event.entityType),
Some(event.entityId),
Some(event.eventTime),
Some(event.creationTime),
= Some(new Properties(
... ))
))
}).filter(_.isSuccess).map(_.get)
} catc= h {
c= ase e: Exception =3D>= ; // fatal because= of error, an empty query
logger.error(s"Error when reading events: ${e}")
throw e
}

...
=
}



--
=
--001a114b18eabb24ee053d2eaede--