ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From begineer <redni...@gmail.com>
Subject Detecting terminal condition for group of items in Ignite cache.
Date Wed, 14 Dec 2016 11:02:15 GMT
Hi,
My sample application processes trades for different companies stored in
Ignite cache. When all trades for particular company reaches SUCCESS stage,
an automatic notification should be triggered and some other
system/application will react to it. To do this, When ever any trade reaches
SUCCESS stage, I detect it using Continuous query, I am comparing total size
of trades for particular company in cache and the trades for that company
which are in SUCCESS stage. If they are equal, trigger a notification else
wait.
There are two drawbacks with this approach.
1. I have to query through all the trades for that company twice every time
trade reach SUCCESS stage which is really really bad.
2. notification can be triggered multiple times if multiple threads
processing trades in parallel in SUCCESS stage.(i.e lets say last 5 items
reach SUCCESS stage together, so 5 emails will be sent since continuous
query will run local listener 5 times)

Is there a better way to do same task. I am hoping it is. My current
approach cannot work in real time application.

Below is my code.



public class TerminalEventsUsingContQuery {
	IgniteCache<Integer, Trade> cache;

	public static void main(String[] args) {
		new TerminalEventsUsingContQuery().test();
	}

	private void test() {

		Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
		CacheConfiguration<Integer, Trade> config = new
CacheConfiguration<>("TradesCache");

		cache = ignite.getOrCreateCache(config);
		ContinuousQuery<Integer, Trade> query = new ContinuousQuery<>();
		query.setLocalListener(events -> events.forEach(e ->
process(e.getValue())));

		query.setRemoteFilterFactory(factoryOf(e ->
TradeStatus.SUCCESS.equals(e.getValue().getStatus())));
		query.setInitialQuery(new ScanQuery<Integer, Trade>((k, v) ->
TradeStatus.SUCCESS.equals(v.getStatus())));
		buildData();
		QueryCursor<Entry&lt;Integer, Trade>> cursor = cache.query(query);
		cursor.forEach(entry -> process(entry.getValue()));
		Trade t9 = new Trade(9, TradeStatus.SUCCESS, "type1", 100);
		cache.put(t9.getId(), t9);
	}

	private void process(Trade trade) {
		List<Entry&lt;Integer, Trade>> totalperRef = cache
				.query(new ScanQuery<Integer, Trade>((k, v) -> v.getRef() ==
trade.getRef())).getAll();

		List<Entry&lt;Integer, Trade>> totalSuccessForRef = cache.query(new
ScanQuery<Integer, Trade>(
				(k, v) -> v.getRef() == trade.getRef() &&
TradeStatus.SUCCESS.equals(v.getStatus()))).getAll();

		if (totalperRef.size() == totalSuccessForRef.size()) {
			System.out.println("Terminal condition reached. Notify the handler for :
" + trade.getRef());
		} else {
			System.out.println("Terminal condition not reached yet. Current
processing Trade : " + trade.getId());
		}
	}

	private void buildData() {
		Trade t1 = new Trade(1, TradeStatus.SUCCESS, "type1", 100);
		Trade t2 = new Trade(2, TradeStatus.FAILED, "type1", 101);
		Trade t3 = new Trade(3, TradeStatus.EXPIRED, "type1", 102);
		Trade t4 = new Trade(4, TradeStatus.SUCCESS, "type1", 100);
		Trade t5 = new Trade(5, TradeStatus.CHANGED, "type1", 103);
		Trade t6 = new Trade(6, TradeStatus.SUCCESS, "type1", 100);
		Trade t7 = new Trade(7, TradeStatus.CHANGED, "type1", 103);
		Trade t8 = new Trade(8, TradeStatus.SUCCESS, "type1", 101);
		cache.put(t1.getId(), t1);
		cache.put(t2.getId(), t2);
		cache.put(t3.getId(), t3);
		cache.put(t4.getId(), t4);
		cache.put(t5.getId(), t5);
		cache.put(t6.getId(), t6);
		cache.put(t7.getId(), t7);
		cache.put(t8.getId(), t8);
	}
}

public class Trade { 
        private int id; 
        private TradeStatus status; 
        private String tradeType; 
        public Trade(int id, TradeStatus status, String tradeType) { 
                this.id = id; 
                this.status = status; 
                this.tradeType = tradeType; 
        } 
        
//setter getter, equals, hashcode methods 
}
 
public enum TradeStatus { 
        NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS
}



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Detecting-terminal-condition-for-group-of-items-in-Ignite-cache-tp9526.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Mime
View raw message