ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pavel Tupitsyn <ptupit...@apache.org>
Subject Re: Using a cache as an affinity co-located processing buffer in Ignite.Net
Date Wed, 25 Apr 2018 05:19:14 GMT
What do you mean by "instance"? In terms of CLR that would be a different
instance on every node.

On Wed, Apr 25, 2018 at 2:50 AM, Raymond Wilson <raymond_wilson@trimble.com>
wrote:

> I’m using a Continuous Query in both options (grid deployed service using
> a CQ versus an independent context using a CQ). I was curious which context
> using a CQ would be seen as desirable.
>
>
>
> In the case where a filter is provided to a CQ for both the initial query
> and for newly items arriving in the cache I would need to supply the same
> filter instance for both as the processing logic has state that will need
> to be shared between the two. Once the CQ has been serialized to the remote
> nodes, will that filter be two separate instances or will is retain the
> same singular instance?
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Wednesday, April 25, 2018 6:08 AM
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> ContinuousQuery is the best practice for most kinds of streaming use
> cases. I think it fits your use case as well.
>
>
>
> On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Thanks, that makes sense.
>
>
>
> From a best practices perspective, is better to have a grid deployed
> service on each node executing local continuous queries against the cache
> and orchestrating the processing from within the service, versus having
> some singular context in the grid that uses the continuous query by placing
> processing orchestration logic in the filter sent to the remote nodes?
>
> Sent from my iPhone
>
>
> On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <ptupitsyn@apache.org> wrote:
>
> Sorry, looks like I have misunderstood you.
>
>
>
> If you need initial scan, of course you can have it by using ScanQuery as
> initialQuery.
>
> Place all the processing logic into the ScanQuery filter, and return false
> from there.
>
> This way you can process all existing entries in a co-located fashion
> without sending them to the initiator node.
>
>
>
> Thanks,
>
> Pavel
>
>
>
> On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Not being able to do an initial scan of elements on the remote nodes is a
> bit of a problem (possibly a bug?)
>
>
>
> Something that’s occurred to me is to wrap this behaviour into an Ignite
> service deployed onto all of the server nodes, and use a local mode
> continuous query from within each service to perform an initial scan of
> elements and then steady state handling as new elements arrive.
>
>
>
> The reason the initial scan is important is I need to handle cases where
> there may be a non-trivial queue of items waiting for processing and there
> is either a shutdown/restart of the grid, or there is a topology change
> event that triggers rebalancing
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Tuesday, April 24, 2018 5:54 AM
>
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> >  Is the initial query also run in the context of the remote node and
> the remote filter?
>
> No, it is just a query (can be SQL or Scan) which allows you to get a
> "full picture" on the calling node:
>
> all existing data and all future data.
>
>
>
> So in your scenario it is not very useful.
>
>
>
> >   return false from the filter so the element is not sent to the local
> listener
>
> Yes, exactly
>
>
>
> On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> OK – I see how that works.
>
>
>
> In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
> there is this code:
>
>
>
> using (var queryHandle = cache.QueryContinuous(qry, initialQry))
>
> {
>
>     // Iterate through existing data stored in cache.
>
>     foreach (var entry in queryHandle.GetInitialQueryCursor())
>
>         Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);
>
>
>
>     // Add a few more keys and watch a few more query notifications.
>
>     for (int i = 5; i < 15; i++)
>
>         cache.Put(i, i.ToString());
>
> }
>
>
>
> Is the initial query also run in the context of the remote node and the
> remote filter?
>
>
>
> Construction of the ContinuousQuery also requires provision of
> LocalListener to receive the cache update items. Is the approach here to
> processing the element in the remote filter context then return false from
> the filter so the element is not sent to the local listener?
>
>
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:50 PM
>
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Remote Listener is deployed on every cache node and is invoked only on a
> primary node for that key.
>
> In other words, for each key there is only one invocation of the remote
> filter, and that invocation is local to that key.
>
>
>
> So you can place your processing logic into the Remote Filter.
>
>
>
> On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Hi Pavel,
>
>
>
> Yes, I looked at continuous queries. They appear to be oriented toward a
> single context being sent the newly arrived elements in the cache from all
> primary nodes hosting the cache involved in the query.
>
>
>
> In the use case I outlined below, I would like to have the items processed
> in co-located contexts (ie: the data does not move and is processed in situ
> on the primary node). How do you do that with a continuous query?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:18 PM
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Hi Raymond,
>
>
>
> To process incoming data in a co-located fashion there is a Continuous
> Query feature [1].
>
> Looks like it fits your use case quite well.
>
>
>
>
>
> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>
>
>
> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> I did find ICache.GetLocalEntries() method and have written the following
> as a proof of concept (yet to exercise it though):
>
>
>
>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>
>
>
>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
> localItems.FirstOrDefault();
>
>
>
>             if (first != null)
>
>             {
>
>                 // Get the list of all items in the buffer matching the
> affinity key of the first item
>
>                 // in the list, limiting the result set to 100 TAG files.
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
>                 if (candidates?.Count > 0)
>
>                 {
>
>                     // Submit the list of items to the processor
>
>                     // ...
>
>                 }
>
>             }
>
>
>
> This seems like it should do what I want, but I’m a little suspicious that
> it may evaluate the entire content of the cache against the Where()
> condition before taking the first 100 results.
>
>
>
> I think I can constrain it by modifying the LINQ expression like this:
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(100)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .ToList();
>
>
>
> Which will at least limit the overall number examined to be 100, while not
> capturing the first 100 that do match.
>
>
>
> I could further modify it to a ‘double-take’ which still constrains the
> overall query but improves the chances of filling the maximum take of 100
> matching items
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(1000)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
> Or is there a better way?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
> *Sent:* Monday, April 23, 2018 1:11 PM
> *To:* user@ignite.apache.org
> *Subject:* Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
>
>
> All,
>
>
>
> I have been thinking about how to use Ignite.Net to support an affinity
> co-located ingest pipeline that uses queue buffering to provide fault
> tolerance and buffering for a flow of ingest packages.
>
>
>
> At a high level, it looks like this:
>
>
>
> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
> affinity co-located with PackageProcessor]
>
> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
> [ProcessedDataCache affinity co-located with PackageProcessor]
>
>
>
> Essentially, I want a cache that look like this:
>
>
>
> Public class CacheItem
>
> {
>
>     Public DateTime date;
>
>
>
>   [AffinityKeyMapped]
>
>      public Guid AffinityKey;
>
>
>
>      public byte [] Package;
>
> }
>
>
>
>    ICache<string, CacheTime> BufferQueue.
>
>
>
> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>
>                     new CacheConfiguration
>
>                     {
>
>                         Name = “BufferQueue”,
>
>
>
>                         KeepBinaryInStore = true,
>
>
>
>                         // Replicate the maps across nodes
>
>                         CacheMode = CacheMode.Partitioned,
>
>                     });
>
>             }
>
>
>
> This queue will target a data region that is configured for persistency.
>
>
>
> Inbound packages will arrive and be injected into the BufferQueue cache
> from some client node context, like this:
>
>
>
> public void HandleANewPackage(string key, Guid affinityKey, byte []
> package)
>
> {
>
> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
> affinityKey, Package = package});
>
> }
>
>
>
> There will be a collection of server nodes that are responsible for the
> cache.
>
>
>
> This is all straightforward. The tricky bit is then processing the
> elements in the BufferQueue cache.
>
>
>
> The data is already on the server nodes, nicely co-located according to
> its affinity. I want to have parallel processing logic that runs on the
> server nodes that pulls elements from the buffer queue and processes them
> into some other cache(s).
>
>
>
> At this point I know I have a cache that may contain something needing to
> be processed, but I don’t know their keys. I know it’s possible to have
> logic running on each server node that does this (either as a Grid Service
> or a Compute::Broadcast() lambda):
>
>
>
> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>
> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
> ()));
>
>
>
> foreach (var cacheEntry in cursor)
>
>     ProcessItem(CacheEntry);
>
>
>
> …but I am not sure how to restrict the elements in the cache returned to
> the query to be only those entries affinity co-located with the server
> asking for them.
>
>
>
> Is this so obvious that it just works and does not need documentation, or
> is this not possible and I should run the processing context from a client
> node context (as above) and pay the penalty of extracting the packages from
> the cache with cache.Query() and then resubmitting them using an affinity
> aware method like AffinityRun()?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Mime
View raw message