ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Valentin Kulichenko <valentin.kuliche...@gmail.com>
Subject Re: Embedded mode ignite on spark for cache data lost issues
Date Mon, 15 Aug 2016 18:49:06 GMT
There is already a discussion on user list regarding this [1]. Let's
continue there as this is not for the dev list. I will try to respond there
shortly.

[1]
http://apache-ignite-users.70518.x6.nabble.com/Embedded-mode-ignite-on-spark-td6942.html

-Val

On Sun, Aug 14, 2016 at 6:24 PM, percent620 <percent620@163.com> wrote:

> I will describe my issues detailed as below. I have the same code on two
> scenarios  but first one is correct and second one is not correct.
>
> I'm studying ignite recent days but can't get correct result on
> this....Hopefully anyone can help me on this.
>
> ==============================
> 1、Run ignite with spark-shell
> 1)./spark-shell --jars
> /u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-core-1.
> 6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/
> ignite-spark/ignite-spark-1.6.0.jar,/u01/xxx/apache-ignite-
> hadoop-1.6.0-bin/libs/cache-api-1.0.0.jar,/u01/xxx/apache-
> ignite-hadoop-1.6.0-bin/libs/ignite-log4j/ignite-log4j-1.6.
> 0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-
> log4j/log4j-1.2.17.jar
> --packages
> org.apache.ignite:ignite-spark:1.6.0,org.apache.ignite:ignite-spring:1.6.0
>
> 2)running the following code on spark-shell
> val ic = new IgniteContext[Int, Int](sc, () => new
> IgniteConfiguration(),false)
>     val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
>     val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i))
>     println("initalRDD.counter=/. " + initalRDD.count() +"\t
> partitionCounter=> " + initalRDD.partitions.size)
>
>     //sharedRDD.saveValues(initalRDD.map(line=>line._1))
>     sharedRDD.savePairs(initalRDD, true)//override cache on ignite
>     println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t
> igniteParitionCounter => " + sharedRDD.partitions.size)
>     println("=====>totalIgniteFilterConditionEmbedCounter" +
> sharedRDD.filter(_._2 > 50000).count)
>
> 3)result as below
> scala> import org.apache.ignite.spark._
> import org.apache.ignite.spark._
>
> scala> import org.apache.ignite.configuration._
> import org.apache.ignite.configuration._
>
> scala> val ic = new IgniteContext[Int, Int](sc, () => new
> IgniteConfiguration(),false)
> ic: org.apache.ignite.spark.IgniteContext[Int,Int] =
> org.apache.ignite.spark.IgniteContext@74e72ff4
>
> scala>     val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
> sharedRDD: org.apache.ignite.spark.IgniteRDD[Int,Int] = IgniteRDD[1] at
> RDD
> at IgniteAbstractRDD.scala:31
>
> scala>     val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i))
> initalRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at
> map
> at <console>:33
>
> scala>     println("initalRDD.counter=/. " + initalRDD.count() +"\t
> partitionCounter=> " + initalRDD.partitions.size)
> initalRDD.counter=/. 100000     partitionCounter=> 10
>
> scala> sharedRDD.savePairs(initalRDD, true)//override cache on ignite
>
> scala>     println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t
> igniteParitionCounter => " + sharedRDD.partitions.size)
> =====>totalIgniteEmbedCounter100000      igniteParitionCounter => 1024
>
> scala>     println("=====>totalIgniteFilterConditionEmbedCounter" +
> sharedRDD.filter(_._2 > 50000).count)
> =====>totalIgniteFilterConditionEmbedCounter50000
>
> *totalIgniteEmbedCounter is :100000 ,right *
> *totalIgniteFilterConditionEmbedCounteris :50000, right *
> ==============================
>
>
> 2、IDEA project
> 1)create a maven project on idea
> 2) import ignite maven files as above [1]
>   <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-core</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-indexing</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-visor-console</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-spring</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-spark</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-yarn</artifactId>
>             <version>1.6.0</version>
>         </dependency>
> 3)code as below for idea
> object TestIgniteEmbedCache {
>   def main(args: Array[String]) {
>     val conf = new SparkConf().setAppName("TestIgniteEmbedCache")
>     val sc = new SparkContext(conf)
>
>     //val ic = new IgniteContext[Int, Int](sc, () => new
> IgniteConfiguration().setIncludeEventTypes(EventType.EVT_TASK_FAILED),
> false)
>     val ic = new IgniteContext[Int, Int](sc, () => new
> IgniteConfiguration(),false)
>     val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
>     val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i))
>     println("initalRDD.counter=/. " + initalRDD.count() +"\t
> partitionCounter=> " + initalRDD.partitions.size)
>
>     //sharedRDD.saveValues(initalRDD.map(line=>line._1))
>     sharedRDD.savePairs(initalRDD, true)//override cache on ignite
>     println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t
> igniteParitionCounter => " + sharedRDD.partitions.size)
>     println("=====>totalIgniteFilterConditionEmbedCounter" +
> sharedRDD.filter(_._2 > 50000).count)
>
>   }
>
> }
> 4、running maven clean assembly:assembly and get sparkignitedemo.jar
>
> 5、upload this jar to our linux driver machine and submit jar to yarn
> cluster
> using spark-submit command as below
>
> /u01/spark-1.6.0-hive/bin/spark-submit --driver-memory 8G --class
> com.TestIgniteEmbedCache --master yarn --executor-cores 5 --executor-memory
> 1000m --num-executors 10 --conf spark.rdd.compress=false --conf
> spark.shuffle.compress=false --conf spark.broadcast.compress=false
> /home/sparkignitedemo.jar
>
>
> 6、result: this is issue on this
> *totalIgniteEmbedCounter is : 40000 or 3000(I think is random) *
> *totalIgniteFilterConditionEmbedCounteris :10000 or 2000(random) *
> ==========================
>
> This result is very make me to be confused on this why the same code have
> two different result on this? Can anyone help me on this? I'm blocking this
> issue on several days.
>
> Thanks!!!
>
>
>
> --
> View this message in context: http://apache-ignite-
> developers.2346864.n4.nabble.com/Embedded-mode-ignite-on-
> spark-for-cache-data-lost-issues-tp10685.html
> Sent from the Apache Ignite Developers mailing list archive at Nabble.com.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message