ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Supun Nakandala <supun.nakand...@gmail.com>
Subject What is the most efficient way to scan all data partitions?
Date Wed, 09 May 2018 08:57:59 GMT
Hi devs,

I have a dataset that needs to be iteratively processed which is larger
than the available total cluster memory. So I am using ignite native
persistence and use compute broadcast method distribute processing logic.

This works really well on small data (which can fit in memory) but is
significantly slower on larger datasets. When checked the data reading
throughput using "iotop" command it shows ignite is reading data at a very
slow rate (<2 MB).

I have attached a stripped down version of my code below. I am using Scan
query to scan through all the data. Is there a better alternative for this?

Thank you
-Supun

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;

import javax.cache.Cache;
import java.util.Iterator;

public class SlowDurableMemoryScan {
    public static void main(String[] args) {
        Ignition.setClientMode(true);

        System.out.println("Starting test...");
        try (Ignite ignite =
Ignition.start("../../config/cloudlab-ignite.xml")) {

            ignite.active(true);

            CacheConfiguration<Float, Float[]> cacheConfig = new
CacheConfiguration<>("S");
            cacheConfig.setCacheMode(CacheMode.PARTITIONED);
            cacheConfig.setDataRegionName("Default_Region");
            cacheConfig.setStoreKeepBinary(true);
            cacheConfig.setBackups(0);
            cacheConfig.setAffinity(new
RendezvousAffinityFunction().setExcludeNeighbors(true).setPartitions(1024));

            IgniteCache<Float, Float[]> S =
ignite.getOrCreateCache(cacheConfig);

            cacheConfig = new CacheConfiguration<>("T");
            cacheConfig.setCacheMode(CacheMode.PARTITIONED);
            cacheConfig.setDataRegionName("Default_Region");
            cacheConfig.setStoreKeepBinary(true);
            cacheConfig.setBackups(0);
            cacheConfig.setAffinity(new
RendezvousAffinityFunction().setExcludeNeighbors(true).setPartitions(1024));

            IgniteCache<Float, Float[]> T =
ignite.getOrCreateCache(cacheConfig);

            System.out.println("Starting initial loading...");
            long prev_time = System.currentTimeMillis();
            ignite.compute(ignite.cluster().forDataNodes("S")).broadcast(()
-> {
                int[] partitions =
ignite.affinity("S").primaryPartitions(ignite.cluster().localNode());

                IgniteDataStreamer<Float, Float[]> streamerS =
Ignition.localIgnite().dataStreamer("S");
                streamerS.perNodeBufferSize(100);

                for (final int partition : partitions) {
                    System.out.println("Partition: " + partition);
                    for (int i = 0; i < 16; i++) {
                        streamerS.addData((float) (partition + 1.0 / (i +
1)), new Float[1024 * 1024]);
                    }
                }
            });
            System.out.println("Data loading time(s): " +
((System.currentTimeMillis() - prev_time) / 1000));


            //Scan the data and process
            prev_time = System.currentTimeMillis();
            ignite.compute(ignite.cluster().forDataNodes("S")).broadcast(()
-> {
                IgniteDataStreamer<Float, Float[]> streamerT =
Ignition.localIgnite().dataStreamer("T");
                streamerT.perNodeBufferSize(100);
                int[] partitions =
ignite.affinity("S").primaryPartitions(ignite.cluster().localNode());

                for (final int partition : partitions) {
                    ScanQuery<Float, Float[]> query = new ScanQuery<>();
                    query.setLocal(true);
                    query.setPartition(partition);
                    QueryCursor<Cache.Entry<Float, Float[]>> cursor =
S.query(query);

                    int count = 0;
                    for (Iterator<Cache.Entry<Float, Float[]>> it =
cursor.iterator(); it.hasNext(); ) {
                        Cache.Entry<Float, Float[]> entry = it.next();

                        streamerT.addData(entry.getKey(), entry.getValue());
                        count += 1;
                        System.out.println("Count: " + count);
                    }
                }
            });
            System.out.println("Data update time(s): " +
((System.currentTimeMillis() - prev_time) / 1000));

        }
    }
}

Mime
View raw message