ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankit Singhai <ankit...@gmail.com>
Subject Re: IgniteDataStreamer with Continuous Query
Date Thu, 29 Dec 2016 08:43:29 GMT
Hi,
In my test case scenario I am pumping data into cache via data streamer
which has Created expiration policy of 60 seconds to have sliding window of
60 seconds, after the initial bursts I am making my thread to sleep then
again pump data, but for the 2nd bursts I am not getting any events (local
or remote) only the data I receive is after initial search. Can somebody
point out what I am doing wrong here?

Ignite Configuration
<bean id="ignite.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="clientMode" value="true"/>
        <property name="gridName" value="TestGrid"/>

        <property name="peerClassLoadingEnabled" value="true"/>

        
        

        
        

        <property name="discoverySpi">
            <bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    
                    
                    
                    <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
                <property name="localAddress" value="127.0.0.1"/>
                
                <property name="heartbeatFrequency" value="2000"/>
            </bean>
        </property>

        <property name="lifecycleBeans">
            <list>
                <bean class="com.gvc.LifeCycleBeanImpl"
id="lifeCycleBeanImpl"/>
            </list>
        </property>

        <property name="communicationSpi">
            <bean
class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                <property name="slowClientQueueLimit" value="1000"/>
                <property name="localPort" value="4321"/>
            </bean>
        </property>
    </bean>

Cache Configuration
    <bean id="ipCache"
class="org.apache.ignite.configuration.CacheConfiguration"
scope="singleton">
        <property name="name" value="ipCache"/>

        
        <property name="cacheMode" value="REPLICATED"/>
        
        <property name="memoryMode" value="ONHEAP_TIERED"/>

        
        <property name="offHeapMaxMemory" value="#{256 * 1024L * 1024L}"/>

        
        <property name="backups" value="0"/>

        
        <property name="writeSynchronizationMode" value="PRIMARY_SYNC"/>

        
        <property name="startSize" value="#{1 * 1024 * 1024}"/>

        
        <property name="swapEnabled" value="false"/>

        
        <property name="evictionPolicy">
            <bean
class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy">
                
                <property name="maxSize" value="1000"/>
            </bean>
        </property>

        
        <property name="rebalanceMode" value="SYNC"/>

        
        <property name="rebalanceBatchSize" value="#{1024 * 1024}"/>

        
        <property name="rebalanceThrottle" value="0"/>

        
        <property name="rebalanceThreadPoolSize" value="4"/>
    </bean>

Ignite Data Streamer Code via StreamTransformer

CacheConfiguration<String,Integer> ipCacheConfiguration =
(CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
        ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
       
ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
CreatedExpiryPolicy(new Duration(SECONDS, 60))));

        IgniteCache<String,Integer> ipCache =
ignite.getOrCreateCache(ipCacheConfiguration);

        Random RAND = new Random();

        try (IgniteDataStreamer<String,Integer> igniteDataStreamer =
ignite.dataStreamer(ipCache.getName())){
            igniteDataStreamer.allowOverwrite(true);

            igniteDataStreamer.receiver(new
StreamTransformer<String,Integer>() {
                @Override
                public Object process(MutableEntry<String,Integer>
mutableEntry, Object... objects) throws EntryProcessorException {
                    Integer val = mutableEntry.getValue();

                    // Increment count by 1.
                    mutableEntry.setValue(val == null ? (int) 1L : val + 1);

                    return null;
                }
            });

            int range = 1000;
            for(int i = 1 ; i <= 100000 ; i++) {
                igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
            }

            try {
                Thread.sleep(70000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("After Sleeping");
            for(int i = 1 ; i <= 100000 ; i++) {
                igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
            }
        }

Continuous Query Code (running on an different JVM)
CacheConfiguration<String,Integer> ipCacheConfiguration =
(CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
        ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
       
ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
CreatedExpiryPolicy(new Duration(SECONDS, 60))));

        IgniteCache<String,Integer> ipCache =
ignite.getOrCreateCache(ipCacheConfiguration);

        ContinuousQuery<String,Integer> continuousQuery = new
ContinuousQuery<>();
        continuousQuery.setInitialQuery(new ScanQuery<String, Integer>(new
IgniteBiPredicate<String, Integer>() {
            @Override
            public boolean apply(String integer, Integer integer2) {
                return integer2 > 100;
            }
        }));

        continuousQuery.setLocalListener(new
CacheEntryUpdatedListener<String, Integer>() {
            @Override
            public void onUpdated(Iterable<CacheEntryEvent&lt;? extends
String, ? extends Integer>> iterable) throws CacheEntryListenerException {
                for (CacheEntryEvent events : iterable) {
                    System.out.println(" Inside local listener :: " +
events);
                }
            }
        });

        continuousQuery.setRemoteFilterFactory(new
Factory<CacheEntryEventFilter&lt;String, Integer>>() {
            @Override
            public CacheEntryEventFilter<String, Integer> create() {
                return new CacheEntryEventFilter<String, Integer>() {
                    @Override
                    public boolean evaluate(CacheEntryEvent<? extends
String, ? extends Integer> cacheEntryEvent) throws
CacheEntryListenerException {
                        System.out.println("Remote Listener");
                        return cacheEntryEvent.getValue() > 100;
                    }
                };
            }
        });

        try (QueryCursor<Cache.Entry&lt;String, Integer>> cur =
ipCache.query(continuousQuery)){
            for(Cache.Entry<String,Integer> c : cur) {
                System.out.println(c);
            }
        }

Thanks



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779p9795.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Mime
View raw message