ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ANKIT SINGHAI <ankit...@gmail.com>
Subject Re: IgniteDataStreamer with Continuous Query
Date Sat, 31 Dec 2016 01:00:48 GMT
Can someone please have a look at below query?

Thanks

On Dec 29, 2016 2:13 PM, "Ankit Singhai" <ankit284@gmail.com> wrote:

> Hi,
> In my test case scenario I am pumping data into cache via data streamer
> which has Created expiration policy of 60 seconds to have sliding window of
> 60 seconds, after the initial bursts I am making my thread to sleep then
> again pump data, but for the 2nd bursts I am not getting any events (local
> or remote) only the data I receive is after initial search. Can somebody
> point out what I am doing wrong here?
>
> Ignite Configuration
> <bean id="ignite.cfg"
> class="org.apache.ignite.configuration.IgniteConfiguration">
>         <property name="clientMode" value="true"/>
>         <property name="gridName" value="TestGrid"/>
>
>         <property name="peerClassLoadingEnabled" value="true"/>
>
>
>
>
>
>
>
>         <property name="discoverySpi">
>             <bean
> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>                 <property name="ipFinder">
>
>
>
>                     <bean
> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.
> TcpDiscoveryVmIpFinder">
>                         <property name="addresses">
>                             <list>
>
>                                 <value>127.0.0.1:47500..47509</value>
>                             </list>
>                         </property>
>                     </bean>
>                 </property>
>                 <property name="localAddress" value="127.0.0.1"/>
>
>                 <property name="heartbeatFrequency" value="2000"/>
>             </bean>
>         </property>
>
>         <property name="lifecycleBeans">
>             <list>
>                 <bean class="com.gvc.LifeCycleBeanImpl"
> id="lifeCycleBeanImpl"/>
>             </list>
>         </property>
>
>         <property name="communicationSpi">
>             <bean
> class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
>                 <property name="slowClientQueueLimit" value="1000"/>
>                 <property name="localPort" value="4321"/>
>             </bean>
>         </property>
>     </bean>
>
> Cache Configuration
>     <bean id="ipCache"
> class="org.apache.ignite.configuration.CacheConfiguration"
> scope="singleton">
>         <property name="name" value="ipCache"/>
>
>
>         <property name="cacheMode" value="REPLICATED"/>
>
>         <property name="memoryMode" value="ONHEAP_TIERED"/>
>
>
>         <property name="offHeapMaxMemory" value="#{256 * 1024L * 1024L}"/>
>
>
>         <property name="backups" value="0"/>
>
>
>         <property name="writeSynchronizationMode" value="PRIMARY_SYNC"/>
>
>
>         <property name="startSize" value="#{1 * 1024 * 1024}"/>
>
>
>         <property name="swapEnabled" value="false"/>
>
>
>         <property name="evictionPolicy">
>             <bean
> class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy">
>
>                 <property name="maxSize" value="1000"/>
>             </bean>
>         </property>
>
>
>         <property name="rebalanceMode" value="SYNC"/>
>
>
>         <property name="rebalanceBatchSize" value="#{1024 * 1024}"/>
>
>
>         <property name="rebalanceThrottle" value="0"/>
>
>
>         <property name="rebalanceThreadPoolSize" value="4"/>
>     </bean>
>
> Ignite Data Streamer Code via StreamTransformer
>
> CacheConfiguration<String,Integer> ipCacheConfiguration =
> (CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
>         ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
>
> ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
> CreatedExpiryPolicy(new Duration(SECONDS, 60))));
>
>         IgniteCache<String,Integer> ipCache =
> ignite.getOrCreateCache(ipCacheConfiguration);
>
>         Random RAND = new Random();
>
>         try (IgniteDataStreamer<String,Integer> igniteDataStreamer =
> ignite.dataStreamer(ipCache.getName())){
>             igniteDataStreamer.allowOverwrite(true);
>
>             igniteDataStreamer.receiver(new
> StreamTransformer<String,Integer>() {
>                 @Override
>                 public Object process(MutableEntry<String,Integer>
> mutableEntry, Object... objects) throws EntryProcessorException {
>                     Integer val = mutableEntry.getValue();
>
>                     // Increment count by 1.
>                     mutableEntry.setValue(val == null ? (int) 1L : val +
> 1);
>
>                     return null;
>                 }
>             });
>
>             int range = 1000;
>             for(int i = 1 ; i <= 100000 ; i++) {
>                 igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
>             }
>
>             try {
>                 Thread.sleep(70000);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>
>             System.out.println("After Sleeping");
>             for(int i = 1 ; i <= 100000 ; i++) {
>                 igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
>             }
>         }
>
> Continuous Query Code (running on an different JVM)
> CacheConfiguration<String,Integer> ipCacheConfiguration =
> (CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
>         ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
>
> ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
> CreatedExpiryPolicy(new Duration(SECONDS, 60))));
>
>         IgniteCache<String,Integer> ipCache =
> ignite.getOrCreateCache(ipCacheConfiguration);
>
>         ContinuousQuery<String,Integer> continuousQuery = new
> ContinuousQuery<>();
>         continuousQuery.setInitialQuery(new ScanQuery<String, Integer>(new
> IgniteBiPredicate<String, Integer>() {
>             @Override
>             public boolean apply(String integer, Integer integer2) {
>                 return integer2 > 100;
>             }
>         }));
>
>         continuousQuery.setLocalListener(new
> CacheEntryUpdatedListener<String, Integer>() {
>             @Override
>             public void onUpdated(Iterable<CacheEntryEvent&lt;? extends
> String, ? extends Integer>> iterable) throws CacheEntryListenerException {
>                 for (CacheEntryEvent events : iterable) {
>                     System.out.println(" Inside local listener :: " +
> events);
>                 }
>             }
>         });
>
>         continuousQuery.setRemoteFilterFactory(new
> Factory<CacheEntryEventFilter&lt;String, Integer>>() {
>             @Override
>             public CacheEntryEventFilter<String, Integer> create() {
>                 return new CacheEntryEventFilter<String, Integer>() {
>                     @Override
>                     public boolean evaluate(CacheEntryEvent<? extends
> String, ? extends Integer> cacheEntryEvent) throws
> CacheEntryListenerException {
>                         System.out.println("Remote Listener");
>                         return cacheEntryEvent.getValue() > 100;
>                     }
>                 };
>             }
>         });
>
>         try (QueryCursor<Cache.Entry&lt;String, Integer>> cur =
> ipCache.query(continuousQuery)){
>             for(Cache.Entry<String,Integer> c : cur) {
>                 System.out.println(c);
>             }
>         }
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779p9795.
> html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Mime
View raw message