ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Valentin Kulichenko <valentin.kuliche...@gmail.com>
Subject Re: ContinuousQueryWithTransformer implementation questions
Date Wed, 26 Jul 2017 23:48:50 GMT
Yeah, unfortunately current ContinuousQuery object can be used for querying
with transformer. That's actually not good, because adding transformers to
continuous queries and scan queries will be very inconsistent.

AFAIK, there are plans to completely rework query API since we added a lot
of stuff current API is not enough for (DML, DLL, etc.). Probably it makes
sense to consider transformers in the new API as well.

-Val

On Wed, Jul 26, 2017 at 1:32 PM, Nikolay Izhikov <nizhikov.dev@gmail.com>
wrote:

> Hello, Valentin.
>
> As far as I can understand `query(Query<T> qry, IgniteClosure<T, R>
> transformer)` is slightly different from what I should implement.
>
>
> I need to pass two parameter for ContinuousQuery instead of localListener:
>
> - Remote Transformer
> - Local Listener for transformed events
>
> and method you provide can accept only transformer.
>
> Moreover I think I should somehow "extend" ContinuousQuery(my proposal is
> new class with similar name) because issue is about possibility of
> optimization of continuous query mechanism.
>
> Thoughts?
>
>
> 26.07.2017 20:56, Valentin Kulichenko пишет:
>
> Nikolay,
>>
>> We already have the following method for queries with transformer. It
>> currently throws exception for ContinuousQuery.
>>
>> <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R>
transformer)
>>
>> Would it be possible to utilize it instead of creating new API?
>>
>> -Val
>>
>> On Wed, Jul 26, 2017 at 5:26 AM, Николай Ижиков <nizhikov.dev@gmail.com>
>> wrote:
>>
>> Hello, Igniters.
>>>
>>> I'm working on IGNITE-425 [1] issue.
>>> I made a couple of changes in my branch [2] so I want to confirm that
>>> changes with community before moving forward:
>>>
>>> Text of issue:
>>>
>>> ```
>>> Currently if updated entry passes the filter, it is sent to node
>>> initiated
>>> the query entirely.
>>> It would be good to provide user with the ability to transform entry and,
>>> for example,
>>> select only fields that are important. This may bring huge economy to
>>> traffic and lower GC pressure as well.
>>> ```
>>>
>>> 1. I create new class ContinuousQueryWithTransformer extends Query:
>>>
>>> Reasons to create entirely new class without extending ContinuousQuery:
>>>
>>>      a. ContinuousQuery is final so user can't extends it. I don't want
>>> to
>>> change that.
>>>      b. ContinuousQuery contains some deprecated
>>> methods(setRemoteFilter) so
>>> with new class we can get rid of them.
>>>      c. Such public API design disallow usage of existing
>>> localEventListener
>>> with new transformedEventListenr in compile time.
>>>
>>> ```
>>>      public final class ContinuousQueryWithTransformer<K, V, T> extends
>>> Query<Cache.Entry<K, V>> {
>>>          public ContinuousQueryWithTransformer<K, V, T>
>>> setRemoteFilterFactory(Factory<? extends CacheEntryEventFilter<K, V>>
>>> rmtFilterFactory) { /**/ }
>>>
>>>          public ContinuousQueryWithTransformer<K, V, T>
>>> setRemoteTransformerFactory(Factory<? extends IgniteBiClosure<K, V, T>>
>>> factory) { /**/ }
>>>
>>>          public ContinuousQueryWithTransformer<K, V, T>
>>> setLocalTransformedEventListener(TransformedEventListener<T>
>>> locTransEvtLsnr) { /**/ }
>>>
>>>          public interface TransformedEventListener<T> {
>>>              void onUpdated(Iterable<? extends T> events) throws
>>> CacheEntryListenerException;
>>>          }
>>>      }
>>> ```
>>>
>>> 2. I want to edit all tests from package
>>> `core/src/test/java/org/apach/ignite/internal/processors/
>>> cache/query/continuous/`
>>> to ensure my implementation fully support existing tests.
>>> I want to make each test can work both for regular ContinousQuery and
>>> ContinuousQueryWithTransformer:
>>>
>>> Existing test:
>>>
>>> ```
>>>          ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
>>>
>>>          qry.setLocalListener(new CacheEntryUpdatedListener<Object,
>>> Object>() {
>>>              @Override public void onUpdated(Iterable<CacheEntryEvent<?,
>>> ?>>
>>> evts) {
>>>                  for (CacheEntryEvent evt : evts) {
>>>                      if ((Integer)evt.getValue() >= 0)
>>>                          evtCnt.incrementAndGet();
>>>                  }
>>>              }
>>>          });
>>>
>>> ```
>>>
>>> To be:
>>>
>>> ```
>>>          Query qry = createContinuousQuery();
>>>
>>>          setLocalListener(qry, new CI1<T2<Object, Object>>() {
>>>              @Override public void apply(T2<Object, Object> e) {
>>>                  if ((Integer)e.getValue() >= 0)
>>>                      evtCnt.incrementAndGet();
>>>              }
>>>          });
>>> ```
>>>
>>> Base class to support setLocalListener:
>>>
>>> ```
>>>      protected <K, V> void setLocalListener(Query q, CI1<T2<K, V>>
>>> lsnrClsr)
>>> {
>>>          if (isContinuousWithTransformer()) {
>>>              ((ContinuousQueryWithTransformer)q)
>>>                  .setLocalTransformedEventListener(new
>>> TransformedEventListenerImpl(lsnrClsr));
>>>          } else
>>>              ((ContinuousQuery)q).setLocalListener(new
>>> CacheInvokeListener(lsnrClsr));
>>>      }
>>>
>>>      protected static class CacheInvokeListener<K, V>  {
>>>          private CI1<T2<K, V>> clsr;
>>>
>>>          @Override public void onUpdated(Iterable<CacheEntryEvent<?
>>> extends
>>> K, ? extends V>> events)
>>>              throws CacheEntryListenerException {
>>>              for (CacheEntryEvent<? extends K, ? extends V> e : events)
>>>                  clsr.apply(ignite, new T2<>(e.getKey(), e.getValue()));
>>>          }
>>>      }
>>>
>>>      protected static class TransformedEventListenerImpl<K, V>
>>> implements
>>> TransformedEventListener {
>>>          private IgniteBiInClosure<Ignite, T2<K, V>> clsr;
>>>
>>>          @Override public void onUpdated(Iterable evts) throws
>>> CacheEntryListenerException {
>>>              for (Object e : evts) {
>>>                  clsr.apply((T2)e);
>>>              }
>>>          }
>>>      }
>>> ```
>>>
>>> Thoughts?
>>>
>>> [1] https://issues.apache.org/jira/browse/IGNITE-425
>>> [2] https://github.com/nizhikov/ignite/pull/9/files
>>>
>>> --
>>> Nikolay Izhikov
>>> NIzhikov.dev@gmail.com
>>>
>>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message