ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Denis Mekhanikov <dmekhani...@gmail.com>
Subject Re: Using two ignite contexts with spark streaming
Date Wed, 01 Nov 2017 16:53:16 GMT
Hi!

I don't really see what you are trying to achieve.
In the example, that you provided, the second IgniteContext, called
ignitec2, is not used. Do you mean that when you start the second
IgniteContext, then both of them stop working?

When ignitec.fromCache("Data2") is executed, then cache is getting created,
if it haven't been yet. If you see in log that SQL execution fails because
of lacking tables, then query entities are probably not configured
correctly.

Try to find a minimal example, that doesn't work for you, and attach it
here as an archived project. It will be easier to tell what is wrong.

Denis

сб, 28 окт. 2017 г. в 23:47, manuelmourato <manuelmourato25@gmail.com>:

> Hello there,
>
> My use case is relatively simple: I want to be able to save an RDD in Spark
> to two different caches in Ignite, inside the same Spark Context.
>
> When I try to save an RDD to a single IgniteCache, everything works well:
>
>   case class Sensor_Att(
>                          @(QuerySqlField @field)(index = false)    active:
> String,
>                          @(QuerySqlField @field)(index = false)    `type`:
> String,
>                          @(QuerySqlField @field)(index = true)    name:
> String,
>                        )
>
>     val sqlContext: SparkSession =
>
> SparkSession.builder().master("local[*]").appName("DataProcessing").getOrCreate()
>     val sc: SparkContext = sqlContext.sparkContext
>      val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
>
>     val ignitec:IgniteContext = new IgniteContext(sc,()=>new
>
> IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
>       setCacheConfiguration(new
>
> CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("sensorData").
>
>
> setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
> TcpDiscoverySpi().
>       setLocalAddress("127.0.0.1").setLocalPort(47090).setIpFinder(new
> TcpDiscoveryMulticastIpFinder().
>       setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
> TcpCommunicationSpi().setLocalPort(47000)))
>
>
>       val cachedRDD:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data1")
>       val  RDD_with_key: RDD[(String, Sensor_Att)]
> =df_RDD_NEW_CLASS.map(x=>(x.name,x))
>       cachedRDD.savePairs(RDD_with_key)
>       val df=cachedRDD.sql("select * from Sensor_Att")
>       df.show()
>
>
> If however I try to add a second IgniteContext, using the same class as an
> index, and try to save an RDD to its cache, like so:
>
> (code above...)
>  val ignitec=...
>  val ignitec2:IgniteContext = new IgniteContext(sc,()=>new
>
> IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
>       setCacheConfiguration(new
>
> CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("historicsensorData").
>
>
> setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
> TcpDiscoverySpi().
>       setLocalAddress("127.0.0.1").setLocalPort(47091).setIpFinder(new
> TcpDiscoveryMulticastIpFinder().
>       setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
> TcpCommunicationSpi().setLocalPort(47007)))
>
> (code above....)
>       val df=cachedRDD.sql("select * from Sensor_Att")
>       df.show()
>
>       val
> cachedRDD2:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data2")
>       cachedRDD2.savePairs(RDD_with_key)
>       val df2=cachedRDD2.sql("select * from Sensor_Att")
>       df2.show()
>
>
> I get the following error:
>
> javax.cache.CacheException: class
> org.apache.ignite.internal.processors.query.IgniteSQLException: Failed to
> parse query: select * from Sensor_Att
>         at
>
> org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:807)
>         at
>
> org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:765)
>         at org.apache.ignite.spark.IgniteRDD.sql(IgniteRDD.scala:147)
>         at
> sensorApp.SensorDataProcessing$.sensorApp$SensorDataProcessing$$data_proces
>         (...)
>
>
> It seems that I can't derive a second IgniteContext from the same
> SparkContext, because it seems that the "Data2" cache was not created.
> Do you have any suggestions about this?
>
> Thank you.
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Mime
View raw message