ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From percent620 <percent...@163.com>
Subject Embedded mode ignite on spark for cache data lost issues
Date Mon, 15 Aug 2016 01:24:30 GMT
I will describe my issues detailed as below. I have the same code on two
scenarios  but first one is correct and second one is not correct. 

I'm studying ignite recent days but can't get correct result on
this....Hopefully anyone can help me on this. 

============================== 
1、Run ignite with spark-shell 
1)./spark-shell --jars
/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-core-1.6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-spark/ignite-spark-1.6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/cache-api-1.0.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-log4j/ignite-log4j-1.6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-log4j/log4j-1.2.17.jar
--packages
org.apache.ignite:ignite-spark:1.6.0,org.apache.ignite:ignite-spring:1.6.0 

2)running the following code on spark-shell 
val ic = new IgniteContext[Int, Int](sc, () => new
IgniteConfiguration(),false) 
    val sharedRDD = ic.fromCache("sharedBaselineCacheRDD") 
    val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i)) 
    println("initalRDD.counter=/. " + initalRDD.count() +"\t
partitionCounter=> " + initalRDD.partitions.size) 

    //sharedRDD.saveValues(initalRDD.map(line=>line._1)) 
    sharedRDD.savePairs(initalRDD, true)//override cache on ignite 
    println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t
igniteParitionCounter => " + sharedRDD.partitions.size) 
    println("=====>totalIgniteFilterConditionEmbedCounter" +
sharedRDD.filter(_._2 > 50000).count) 

3)result as below 
scala> import org.apache.ignite.spark._ 
import org.apache.ignite.spark._ 

scala> import org.apache.ignite.configuration._ 
import org.apache.ignite.configuration._ 

scala> val ic = new IgniteContext[Int, Int](sc, () => new
IgniteConfiguration(),false) 
ic: org.apache.ignite.spark.IgniteContext[Int,Int] =
org.apache.ignite.spark.IgniteContext@74e72ff4 

scala>     val sharedRDD = ic.fromCache("sharedBaselineCacheRDD") 
sharedRDD: org.apache.ignite.spark.IgniteRDD[Int,Int] = IgniteRDD[1] at RDD
at IgniteAbstractRDD.scala:31 

scala>     val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i)) 
initalRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map
at <console>:33 

scala>     println("initalRDD.counter=/. " + initalRDD.count() +"\t
partitionCounter=> " + initalRDD.partitions.size) 
initalRDD.counter=/. 100000	partitionCounter=> 10 

scala> sharedRDD.savePairs(initalRDD, true)//override cache on ignite 
                                                                                
scala>     println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t
igniteParitionCounter => " + sharedRDD.partitions.size) 
=====>totalIgniteEmbedCounter100000      igniteParitionCounter => 1024           

scala>     println("=====>totalIgniteFilterConditionEmbedCounter" +
sharedRDD.filter(_._2 > 50000).count) 
=====>totalIgniteFilterConditionEmbedCounter50000    

*totalIgniteEmbedCounter is :100000 ,right *
*totalIgniteFilterConditionEmbedCounteris :50000, right *
============================== 


2、IDEA project 
1)create a maven project on idea 
2) import ignite maven files as above [1] 
  <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-core</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-indexing</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-visor-console</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-spring</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-spark</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-yarn</artifactId>
            <version>1.6.0</version>
        </dependency>
3)code as below for idea 
object TestIgniteEmbedCache { 
  def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("TestIgniteEmbedCache") 
    val sc = new SparkContext(conf) 

    //val ic = new IgniteContext[Int, Int](sc, () => new
IgniteConfiguration().setIncludeEventTypes(EventType.EVT_TASK_FAILED),false) 
    val ic = new IgniteContext[Int, Int](sc, () => new
IgniteConfiguration(),false) 
    val sharedRDD = ic.fromCache("sharedBaselineCacheRDD") 
    val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i)) 
    println("initalRDD.counter=/. " + initalRDD.count() +"\t
partitionCounter=> " + initalRDD.partitions.size) 

    //sharedRDD.saveValues(initalRDD.map(line=>line._1)) 
    sharedRDD.savePairs(initalRDD, true)//override cache on ignite 
    println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t
igniteParitionCounter => " + sharedRDD.partitions.size) 
    println("=====>totalIgniteFilterConditionEmbedCounter" +
sharedRDD.filter(_._2 > 50000).count) 

  } 

} 
4、running maven clean assembly:assembly and get sparkignitedemo.jar 

5、upload this jar to our linux driver machine and submit jar to yarn cluster
using spark-submit command as below 

/u01/spark-1.6.0-hive/bin/spark-submit --driver-memory 8G --class
com.TestIgniteEmbedCache --master yarn --executor-cores 5 --executor-memory
1000m --num-executors 10 --conf spark.rdd.compress=false --conf
spark.shuffle.compress=false --conf spark.broadcast.compress=false
/home/sparkignitedemo.jar 


6、result: this is issue on this 
*totalIgniteEmbedCounter is : 40000 or 3000(I think is random) *
*totalIgniteFilterConditionEmbedCounteris :10000 or 2000(random) *
========================== 

This result is very make me to be confused on this why the same code have
two different result on this? Can anyone help me on this? I'm blocking this
issue on several days. 

Thanks!!! 



--
View this message in context: http://apache-ignite-developers.2346864.n4.nabble.com/Embedded-mode-ignite-on-spark-for-cache-data-lost-issues-tp10685.html
Sent from the Apache Ignite Developers mailing list archive at Nabble.com.

Mime
View raw message