Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C95D9200C17 for ; Fri, 10 Feb 2017 09:01:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C613D160B69; Fri, 10 Feb 2017 08:01:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 53520160B5B for ; Fri, 10 Feb 2017 09:01:49 +0100 (CET) Received: (qmail 44364 invoked by uid 500); 10 Feb 2017 08:01:48 -0000 Mailing-List: contact user-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@ignite.apache.org Delivered-To: mailing list user@ignite.apache.org Received: (qmail 44354 invoked by uid 99); 10 Feb 2017 08:01:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Feb 2017 08:01:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E718DC05D7 for ; Fri, 10 Feb 2017 08:01:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.398 X-Spam-Level: ** X-Spam-Status: No, score=2.398 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ObskOG1MU-ga for ; Fri, 10 Feb 2017 08:01:45 +0000 (UTC) Received: from mail-io0-f177.google.com (mail-io0-f177.google.com [209.85.223.177]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 1226A5F576 for ; Fri, 10 Feb 2017 08:01:45 +0000 (UTC) Received: by mail-io0-f177.google.com with SMTP id l66so44867837ioi.1 for ; Fri, 10 Feb 2017 00:01:45 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=OsL+Z+AMoJIfDSdT3zXKFNd4Guk9hToS0ThuPxSn/04=; b=GImfTBaEFqyv2Jy7nZgg+rQK9OFDuqhGtyawoxsmDCdnMZv9emhBswJ9wcmkjtWWQ6 CXGG+se2TrC5pYrlilUVgBlKRaIln6Quo8sWzUGVp6TZGZow7pxfM2LGBwzGMLoW4xUr 11bqBQA3ouroYkXfkrWaPnx0DEmhro02R+zokis1kM1dSY2VKNMV+0a8JqK5BdrrA08k g70Io3fPLxUd7cgXS+MOuTcA/f6+bRX7mWUbP/vBUENMXR92s9cw2QTTkns9YYLidqMa GpupiQgcVqkVgwHpzi9eoGjkkxvWsiwM2JTalfDnq7efqDMveKy/RtScTghIYAwtRdoW M+pw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=OsL+Z+AMoJIfDSdT3zXKFNd4Guk9hToS0ThuPxSn/04=; b=Wi2hjg4xY/S47YoNUgFgOntVEjCTDB6EwM4ys56w86+PvVv5N9KZkEOV5CAx6OTvbb piviMsW/DuEJbpje4MkE/15bf5l5u3OTcu2GnjIHUUF4/aVWeOaJ0uZBqyiMFsuD3Lgm 1ndrWJdjicTCR/oHy2pdcEdqSyXwgzf5o0XMH3tduasT5HyrMHbnkMirMmEJM2TQDRsT IMjqq+vZIO7vQpMlBmjJw85y3P/hPNOzOKnneRqGjh4BLOKFwSuyz8y+DNVc1+ZsABMD hzb9Cr7ZuOoY7U4DKHGvY1dFhwHueWP1oGk6tziY5kvHIkGsZyNkz8a06OSGe7lrd11g KkUw== X-Gm-Message-State: AMke39lvKGHpYii0DjzZp8GllN0Xsq0Q4iABGNc3SbzpakQxg15nXBxBXKPpNdk08UL6YqNPHnqVlx8ej4KTfA== X-Received: by 10.107.8.194 with SMTP id h63mr7982002ioi.70.1486713701863; Fri, 10 Feb 2017 00:01:41 -0800 (PST) MIME-Version: 1.0 Received: by 10.79.39.84 with HTTP; Fri, 10 Feb 2017 00:01:41 -0800 (PST) In-Reply-To: <99729569-3F75-457D-AC94-54D6B3814AC2@gmail.com> References: <99729569-3F75-457D-AC94-54D6B3814AC2@gmail.com> From: Ranjit Sahu Date: Fri, 10 Feb 2017 13:31:41 +0530 Message-ID: Subject: Re: Help needed To: user@ignite.apache.org, jornfranke@gmail.com Content-Type: multipart/alternative; boundary=001a113fb9101517ba05482882ce archived-at: Fri, 10 Feb 2017 08:01:51 -0000 --001a113fb9101517ba05482882ce Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Jorn, The use case what we are working on is something like this. We have three functions, one to extractEntity, one to resolve and to resolve we need an accesor. So first we build a RDD calling extract function in Step 1 below. Next we prepare the look up RDD and lets call it as Accesor. Third is the resolve function which uses the RDD of step 1 and calls function resolve which internally needs to use the Accesor which is again a RDD. As we are working with legacy code we cant do a join. 1) Inputdata=C3=A0 extract Function - > ExtRDD 2) Accesor =C3=A0 LookupRDD 3) ExtRDD ->resolve Function =C3=A0 ExtRDD.map(x->resolve(Accesor )) With this we ended up with Nested RDD issue and we got the below exception. 17/01/19 10:18:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, c911tae.int.westgroup.com): org.apache.spark.SparkException: *RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x =3D> rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.* at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:= 87) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.toArray(RDD.scala:929) at at java.lang.Thread.run(Thread.java:745) To resolve this thought was to replace the Accesor which is a RDD with IgniteCache. below is the code what i am trying // load the content to data frame val dataframe =3D sqlContext.read.format("com.databricks.spark.avro").option("header", "true").load(sparkConf.get("spark.data.avroFiles")) // Serialize the df to scala case class val entityVOs =3D dataframe.map(row =3D> { val wId =3D row.getAs[Long]("wId").toString val gId =3D row.getAs[String]("gId") val oaId =3D row.getAs[String]("oaId") val canName =3D row.getAs[String]("canName") var variantName =3D List[String]().asJavaCollection if(row.getAs[mutable.WrappedArray[String]]("variantName") !=3D null) variantName =3D row.getAs[mutable.WrappedArray[String]]("variantName").toList.asJavaCollect= ion new EntityVO(wId,gId,oaId,canName,variantName) }) in the below code which start from the spark driver program i am trying to build the cache on the worker nodes where ever the functon executes /** Convert each partition to Entity Object and load to cache **/ entityVOs.mapPartitions(x =3D> { val ignite:Ignite =3D Ignition.getOrStart(new IgniteConfiguration()) val orgCacheCfg:CacheConfiguration[String, EntityVO] =3D new CacheConfiguration[String, EntityVO](MY_Cahe_Name) orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO]) orgCacheCfg.setCacheMode(CacheMode.PARTITIONED) orgCacheCfg.setIndexedTypes() val cache:IgniteCache[String, CompanyVO] =3D ignite.getOrCreateCache(orgCacheCfg) while(x.hasNext){ val entityVo =3D x.next() cache.put(entityVo.wId,entityVo) } x }).count() This is my first RDD suppose created for some entities val entityNames =3D Seq("YAMAHA MOTOR CORPORATION", "AVADO BRANDS ", "MADISON MINERALS ", "PALM ", "CAREERENGINE NETWORK INC", "KJEMHUS BULK SALES") val entityNamesRDD =3D sparkContext.parallelize(companyNames) Now the last step calling the resolve function : entityNamesRDD.map(entityName =3D> { companyAccessor.getEntityExact(entityName) }).collect().foreach(println) Below is my funciton where i try to connect to ignite again and fo a query def getCompanyExact(entityName:String):Vector[EntityVO] =3D { val ignite:Ignite =3D Ignition.getOrStart(new IgniteConfiguration()) val cacheConfig:CacheConfiguration[String, CompanyVO] =3D new CacheConfiguration[String, EntityVO](MyCacheName); cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO]) val wcaIngiteCache:IgniteCache[String, EntityVO] =3D ignite.getOrCreateCache(cacheConfig) val queryString =3D "canonicalName =3D ?" val companyNameQuery =3D new SqlQuery[String,EntityVO]("EntityVO",queryString).setArgs(companyName) val results =3D wcaIngiteCache.query(companyNameQuery).getAll() val listIter =3D results.listIterator() val companyResults =3D ListBuffer[CompanyVO]() ...... println(companyResults) companyResults.toVector } On Fri, Feb 10, 2017 at 11:00 AM, J=C3=B6rn Franke w= rote: > Not sure I got the picture of your setup, but the ignite cache should be > started indecently of the application and not within the application. > > Aside from that, can you please elaborate more on the problem you like to > solve - maybe with pseudocode? I am not sure if the approach you have > selected is the right one. > > > On 10 Feb 2017, at 04:23, Ranjit Sahu wrote: > > > > Hi Guys, > > > > We are trying to use ignite key-value store inside the Spark cluster. > What we are to do is : > > > > 1) Load the data to a data frame in spark > > 2) While doing transformation on the data frame, on each executor node > we are starting the ignite cache and load the data. > > > > > > The issue what we are seeing is , when i start with 40 executor nodes, > i can see only 24+ nodes in ignite topology. There is nothing in the log > with exceptions and we are not seeing the full data available in the cach= e > too. > > > > We were not able to use SharedRDD of ignite as we are kind of in a > Nested RDD situation and too avoid that we are trying to work with Ignit= e > cache. > > > > Any clue what's going wrong here. > > > > Thanks, > > Ranjit > > > > > --001a113fb9101517ba05482882ce Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Jorn,

The use case what we are worki= ng on is something like this. We have three functions, one to extractEntity= , one to resolve and to resolve we need an accesor.=C2=A0

So first we build a RDD calling extract function in Step 1 below.
Next we prepare the look up RDD and lets call it as Accesor.=C2=A0=
Third is the resolve function which uses the RDD of step 1 and c= alls function resolve which internally needs to use the Accesor which is ag= ain a RDD.
As we are working with legacy code we cant do a join.= =C2=A0

1) Inputdata=C3=A0= extract Function - > ExtRDD=

2) Accesor =C3=A0=C2=A0LookupRDD=

3) ExtRDD ->resolve Function =C3=A0=C2=A0 ExtRDD.map= (x->resolve(Accesor=C2=A0))

With this we ended up with Nested RDD issue and we got the below excepti= on.=C2=A0

17/01/19 10:18:40 WARN TaskSetManager:=
 Lost task 0.0 in stage 1.0 (TID 2, c911tae.int.westgroup.com): org.apache.spark.SparkException: R=
DD transformations and actions can only be invoked by the driver, not insid=
e of other transformations; for example, rdd1.map(x =3D> rdd2.values.cou=
nt() * x) is invalid because the values transformation and count action can=
not be performed inside of the rdd1.map transformation. For more informatio=
n, see SPARK-5063.
=C2=A0=C2=
=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.sca=
la:87)
=C2=A0=C2=A0=C2=A0=C2=A0=
=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=
=C2=A0 at org.apache.spark.rdd.RDD.toArray(RDD.scala:929)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at 
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=
=C2=A0 at java.lang.Thread.run(Thread.java:745)

To resolve this thought wa=
s to replace the Accesor which is a RDD with IgniteCache.  

below is the=
 code what i am trying
/=
/ load the content to data frame

val dataframe =
=3D sqlContext.read.forma=
t("com.databricks.spark.avro").option("hea=
der", =
"true").load(sparkConf.get("spark.data.avroFiles&=
quot;))

// Serialize the =
df to scala case class
val entityVOs =
=3D dataframe.map(row =3D> {
      val wId =3D row.getAs[Long]("wId").toString
      val gId =3D row.getAs[String]("gId")
      val oaId =3D row.getAs[String]("oaId")
      val canName =3D row.getAs[String]("canName")
      var variantName =3D List[String]().asJavaCollection
      if(row.getAs[mutable.WrappedArray[String]]("variantName") !=
=3D null)
        variantName =3D row.getAs[mutable.WrappedArray[String]]("varia=
ntName").toList.asJavaCollection
      new EntityVO(wId,gId,oaId,canName,variantName)
    })

in the below code which start from the spark d=
river program i am trying to build the cache on the worker nodes where ever=
 the functon executes

 /** Convert each partition to Entity Object and load to cache **/
    entityVOs.mapPartitions(x =3D>=
 {
<=
font color=3D"#000000" face=3D"courier new">      val ignite:Ignite =3D Ign=
ition.getOrStart(new IgniteConfiguration())
      val orgCacheCfg:CacheConfiguration[String, EntityVO] =
=3D new CacheConfiguration[String, EntityVO](MY_Cahe_Name)
orgCacheCfg.setIndexedTypes(classOf[String], classOf[= EntityVO])
orgCacheCfg.setCacheMode(Cac= heMode.PARTITIONED) orgCacheCfg.setIndexedTypes() val cache:IgniteCache[String, CompanyVO] =3D ignite.getOrCreateCache(= orgCacheCfg) while(x.hasNext){ val entityVo =3D x.next() cache.put(entityVo.wId,entityVo)
} x }).count()

This is my first RDD suppose created for =
some entities
val entityNames =3D Seq("YAMAH=
A MOTOR CORPORATION", "AVADO BRANDS ", "MADISON MINERAL=
S ", <=
span style=3D"color:rgb(0,128,0);background-color:rgb(228,228,255);font-wei=
ght:bold">"PALM ", "CAREERENGINE NETWORK INC", "KJ=
EMHUS BULK SALES")
val entityNamesRDD =3D sparkContext.parallelize(=
companyNames)


Now the last step call=
ing the resolve function :

entityNamesRDD.map(entityName =3D> {

companyAccessor= .getEntityExact(entityName)

}).collect().foreach(println)
Below is my funciton whe=
re i try to connect to ignite again and fo a query
=

def getCompanyExact(entityName:String):Vector[EntityVO] =3D {
<= span style=3D"color:rgb(0,0,128);font-weight:bold">val ignite:Ignite= =3D Ignition.getOrStart(new IgniteConfiguration()= )
val cache= Config:CacheConfiguration[String
, CompanyVO] =3D new = CacheConfiguration[String, EntityVO](MyCacheName);
cacheConfig.setIndexedTypes(classOf[St= ring], classOf[EntityVO]) val wcaIngit= eCache:IgniteCache[String, Ent= ityVO] =3D ignite.getOrCreateCache(cacheConfig)
val queryString =3D "canonicalName =3D ?"
val companyNameQuery =3D new
SqlQuery[String
,EntityVO]("EntityVO"= ,queryString).setArgs(companyName)
= val results = =3D wcaIngiteCache.query(companyNameQuery).getAll()
val listIter =3D results.listItera= tor()
val c= ompanyResults =3D ListBuffer[CompanyVO]()
......
  println(companyResults)
companyResults= .toVector
}

=

<=
br>



=
=

=


<= br>
On Fri, Feb 10, 2017 at 11:00 AM, J=C3=B6rn F= ranke <jornfranke@gmail.com> wrote:
Not sure I got the picture of your setup, but the ignite cach= e should be started indecently of the application and not within the applic= ation.

Aside from that, can you please elaborate more on the problem you like to s= olve - maybe with pseudocode? I am not sure if the approach you have select= ed is the right one.

> On 10 Feb 2017, at 04:23, Ranjit Sahu <ranjit.sahu@gmail.com> wrote:
>
> Hi Guys,
>
> We are trying to use ignite key-value store inside the Spark cluster. = What we are to do is :
>
> 1) Load the data to a data frame in spark
> 2) While doing transformation on the data frame, on each executor node= we are starting the ignite cache and load the data.
>
>
> The issue what we are seeing is , when i start=C2=A0 with 40 executor = nodes, i can see only 24+ nodes in ignite topology. There is nothing in the= log with exceptions and we are not seeing the full data available in the c= ache too.
>
> We were not able to use SharedRDD of ignite as we are kind of in a Nes= ted RDD=C2=A0 situation and too avoid that we are trying to work with Ignit= e cache.
>
> Any clue what's going wrong here.
>
> Thanks,
> Ranjit
>
>

--001a113fb9101517ba05482882ce--