ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Harshal Patil <harshal.pa...@mindtickle.com>
Subject Re: Spark dataframe to Ignite write issue .
Date Mon, 25 Mar 2019 18:15:22 GMT
Can please anyone help here ?

On Mon, Mar 25, 2019, 7:47 PM Harshal Patil <harshal.patil@mindtickle.com>
wrote:

> Hi ,
> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres as
> cachePersistance store . After loading of cache , i can read and convert
> data from ignite cache to Spark Dataframe . But while writing back to
> ignite , I get below error
>
> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
> "ENTITY_PLAYABLE" not found*; SQL statement:
>
> INSERT INTO
> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
> VALUES(?,?,?,?,?,?,?,?) [42102-197]
>
> at
> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
> (IgniteH2Indexing.java:1302)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
>
> at
> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
>
> at
> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
>
>
>
> *Read from Ignite* :
>
>
> loading cache
>
>
> val conf = new SparkConf()
>     conf.setMaster("spark://harshal-patil.local:7077")
> //        conf.setMaster("local[*]")
>     conf.setAppName("IGniteTest")
>     conf.set("spark.executor.heartbeatInterval", "900s")
>     conf.set("spark.network.timeout", "950s")
>     conf.set("spark.default.parallelism", "4")
>     conf.set("spark.cores.max", "4")
>     conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
>
> val cfg = () => ServerConfigurationFactory.createConfiguration()
>
> Ignition.start(ServerConfigurationFactory.createConfiguration())
>
> val ic : IgniteContext = new IgniteContext(sc,  cfg)
>
> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_,
_]])
>
>
>
>
> *spark.read*
>
>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
>   .option(IgniteDataFrameSettings.*OPTION_TABLE*,
> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"),
> *count*("gameId").as("total")).collect()(0)
>
>
> *Write To Ignite* :
>
>
> *df.write*
>
>   .format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
>
>   .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
>
>
>   .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
>
>     .option(IgniteDataFrameSettings.
> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
> "gameId,playableId,companyId,version")
>
>     .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
> "true")
>
>   .mode(SaveMode.*Append*)
>
>   .save()
>
>
>
> I think the problem is with *Spring bean Injection on executer node* ,
> please help , what i am doing wrong .
>
>
>
>

Mime
View raw message