ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutch25 <onlycu...@gmail.com>
Subject Modify the underlying IgniteRDD Cache
Date Mon, 30 Oct 2017 15:44:46 GMT
Hi,
I am trying to work with and modify the underlying cache of a RDD, however,
I can not seem to access any values or iterate through it. 

The premise behind the algorithm is there is no good way to do this in an
RDD fashion as each row operation modifys multiple row values. I can do this
on the driver(local) but I have too many rows to store it all in memory,
hence I am trying a shared memory solution.

Is there a way to access the underlying data of an ignite rdd and modify it
locally?

Code Example (groovy, spark):
// Load cache
CacheConfiguration mergeCacheConf = new CacheConfiguration<Long,
Set>().setName("merging").setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setIndexedTypes(Long.class,
Set.class)
ignite.addCacheConfiguration(mergeCacheConf)
ignite.getOrCreateCache(mergeCacheConf)
// Load RDD
Dataset rdd1 = sqlContext.createDataFrame(sc.parallelize(
			
[RowFactory.create([[(long)1,(long)2,(long)3,(long)4].toArray()].toArray()),
RowFactory.create([[(long)4,(long)5,(long)6,(long)7].toArray()].toArray()),
RowFactory.create([[(long)7,(long)8].toArray()].toArray())]),
				new StructType([new
StructField("grouping",DataTypes.createArrayType(DataTypes.LongType),true,
org.apache.spark.sql.types.Metadata.empty())].toArray() as StructField[]))
// Load RDD into ignite cache
JavaIgniteRDD irdd = ic.fromCache(mergeCacheConf)
irdd.savePairs(rdd1.withColumn("id", explode(col("grouping")))
					.select("id", "grouping")
					.toJavaRDD()
					.mapToPair({new scala.Tuple2(it.get(0),
JavaConverters.seqAsJavaListConverter(it.get(1)).asJava().toSet())} as
PairFunction))

IgniteCache icache = ignite.cache("merging")
println "?"+icache.get((Long)1) // Returns ?null
icache.put((Long)9, [9,10].toSet()) // Try setting a key (9)
println "names: "+ignite.cacheNames() // prints ["merging"]
ic.fromCache("merging").take(20).each {println "--"+it} // prints my rows,
does not print the 9 that I put




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Mime
View raw message