ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ranjit Sahu <ranjit.s...@gmail.com>
Subject Re: Help needed
Date Fri, 10 Feb 2017 08:01:41 GMT
Hi Jorn,

The use case what we are working on is something like this. We have three
functions, one to extractEntity, one to resolve and to resolve we need an
accesor.

So first we build a RDD calling extract function in Step 1 below.
Next we prepare the look up RDD and lets call it as Accesor.
Third is the resolve function which uses the RDD of step 1 and calls
function resolve which internally needs to use the Accesor which is again a
RDD.
As we are working with legacy code we cant do a join.

1) Inputdataà extract Function - > ExtRDD

2) Accesor à LookupRDD

3) ExtRDD ->resolve Function à  ExtRDD.map(x->resolve(Accesor ))

With this we ended up with Nested RDD issue and we got the below exception.

17/01/19 10:18:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
2, c911tae.int.westgroup.com): org.apache.spark.SparkException: *RDD
transformations and actions can only be invoked by the driver, not
inside of other transformations; for example, rdd1.map(x =>
rdd2.values.count() * x) is invalid because the values transformation
and count action cannot be performed inside of the rdd1.map
transformation. For more information, see SPARK-5063.*

        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)

        at org.apache.spark.rdd.RDD.toArray(RDD.scala:929)

        at

        at java.lang.Thread.run(Thread.java:745)


To resolve this thought was to replace the Accesor which is a RDD with
IgniteCache.


below is the code what i am trying

// load the content to data frame


val dataframe =
sqlContext.read.format("com.databricks.spark.avro").option("header",
"true").load(sparkConf.get("spark.data.avroFiles"))


// Serialize the df to scala case class

val entityVOs = dataframe.map(row => {
      val wId = row.getAs[Long]("wId").toString
      val gId = row.getAs[String]("gId")
      val oaId = row.getAs[String]("oaId")
      val canName = row.getAs[String]("canName")
      var variantName = List[String]().asJavaCollection
      if(row.getAs[mutable.WrappedArray[String]]("variantName") != null)
        variantName =
row.getAs[mutable.WrappedArray[String]]("variantName").toList.asJavaCollection
      new EntityVO(wId,gId,oaId,canName,variantName)
    })

in the below code which start from the spark driver program i am
trying to build the cache on the worker nodes where ever the functon
executes


 /** Convert each partition to Entity Object and load to cache **/

    entityVOs.mapPartitions(x => {

      val ignite:Ignite = Ignition.getOrStart(new IgniteConfiguration())
      val orgCacheCfg:CacheConfiguration[String, EntityVO] = new
CacheConfiguration[String, EntityVO](MY_Cahe_Name)
      orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO])
      orgCacheCfg.setCacheMode(CacheMode.PARTITIONED)
      orgCacheCfg.setIndexedTypes()
      val cache:IgniteCache[String, CompanyVO] =
ignite.getOrCreateCache(orgCacheCfg)
      while(x.hasNext){
        val entityVo = x.next()
        cache.put(entityVo.wId,entityVo)
      }
      x
    }).count()


This is my first RDD suppose created for some entities

val entityNames = Seq("YAMAHA MOTOR CORPORATION", "AVADO BRANDS ",
"MADISON MINERALS ", "PALM ", "CAREERENGINE NETWORK INC", "KJEMHUS
BULK SALES")

val entityNamesRDD = sparkContext.parallelize(companyNames)



Now the last step calling the resolve function :


entityNamesRDD.map(entityName => {

  companyAccessor.getEntityExact(entityName)

}).collect().foreach(println)


Below is my funciton where i try to connect to ignite again and fo a query


def getCompanyExact(entityName:String):Vector[EntityVO] = {
  val ignite:Ignite = Ignition.getOrStart(new IgniteConfiguration())
  val cacheConfig:CacheConfiguration[String, CompanyVO] = new
CacheConfiguration[String, EntityVO](MyCacheName);
  cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO])
  val wcaIngiteCache:IgniteCache[String, EntityVO] =
ignite.getOrCreateCache(cacheConfig)
  val queryString = "canonicalName = ?"
  val companyNameQuery = new
SqlQuery[String,EntityVO]("EntityVO",queryString).setArgs(companyName)
  val results = wcaIngiteCache.query(companyNameQuery).getAll()
  val listIter = results.listIterator()
  val companyResults = ListBuffer[CompanyVO]()
......

  println(companyResults)
  companyResults.toVector
}










On Fri, Feb 10, 2017 at 11:00 AM, Jörn Franke <jornfranke@gmail.com> wrote:

> Not sure I got the picture of your setup, but the ignite cache should be
> started indecently of the application and not within the application.
>
> Aside from that, can you please elaborate more on the problem you like to
> solve - maybe with pseudocode? I am not sure if the approach you have
> selected is the right one.
>
> > On 10 Feb 2017, at 04:23, Ranjit Sahu <ranjit.sahu@gmail.com> wrote:
> >
> > Hi Guys,
> >
> > We are trying to use ignite key-value store inside the Spark cluster.
> What we are to do is :
> >
> > 1) Load the data to a data frame in spark
> > 2) While doing transformation on the data frame, on each executor node
> we are starting the ignite cache and load the data.
> >
> >
> > The issue what we are seeing is , when i start  with 40 executor nodes,
> i can see only 24+ nodes in ignite topology. There is nothing in the log
> with exceptions and we are not seeing the full data available in the cache
> too.
> >
> > We were not able to use SharedRDD of ignite as we are kind of in a
> Nested RDD  situation and too avoid that we are trying to work with Ignite
> cache.
> >
> > Any clue what's going wrong here.
> >
> > Thanks,
> > Ranjit
> >
> >
>

Mime
View raw message