Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 97641200CE0 for ; Thu, 27 Jul 2017 01:49:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 95D0C169CE4; Wed, 26 Jul 2017 23:49:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B568C169CE2 for ; Thu, 27 Jul 2017 01:49:31 +0200 (CEST) Received: (qmail 62864 invoked by uid 500); 26 Jul 2017 23:49:30 -0000 Mailing-List: contact dev-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list dev@ignite.apache.org Received: (qmail 62841 invoked by uid 99); 26 Jul 2017 23:49:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Jul 2017 23:49:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C5D4DC043B for ; Wed, 26 Jul 2017 23:49:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.879 X-Spam-Level: ** X-Spam-Status: No, score=2.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id rJe45-QMXh4E for ; Wed, 26 Jul 2017 23:49:27 +0000 (UTC) Received: from mail-wm0-f44.google.com (mail-wm0-f44.google.com [74.125.82.44]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 8B29960D08 for ; Wed, 26 Jul 2017 23:49:26 +0000 (UTC) Received: by mail-wm0-f44.google.com with SMTP id m85so79909744wma.1 for ; Wed, 26 Jul 2017 16:49:26 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=UwGEaCKZ8w/QIZlswv1+ojIvyxaxKhQLL3JrwcTJM4s=; b=GrW07xSAdf2C/q4uIktDnjejhArrbpiJ9TsMyaHeeOuwtpuDb815mEqf7UI48XXQj8 xXXiFSVOghHIGsaKS/fBSn4cGJk+UNziB+PJOm4mBgNUvPlj/l5jOhn7eVh5rd2vxB5+ KXElvwOZ+vAXIXcKuuif9Bg+nIKgbbkOaKynJyeeVQUa2leC7C524jd4ajpkeDw+fWiK FoU9ArSCmJWO66sK5wVNxsjYsptW5zYkaWgApt9wsahsY7h0Ac4OF0GYa+ePXBqbVoT4 wy0o7/RHohpqYY2cQ723ss39KF1WUyiBBxCO9pv56Z/EImaKH3MMi0Tc24rO7BlFg24a tFjQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=UwGEaCKZ8w/QIZlswv1+ojIvyxaxKhQLL3JrwcTJM4s=; b=RIx7MPdV4nLLsoV9A9A2DBVMgOPNsqiXBA144pFcln85VZb5idrzh1tqGnF2n8/ARS wErsYTZJP5TwV9Ix5qbcbIO/Twr51llXwSy02Q1KrF5j3g7Cv721azVg1slGpBhZQV+i Ea7hxw1tlExjG+mmxDgjuD9vLgpwZSNj34OomG6kvq07Hn1YASDMQP0EDnQZtTxAAvQX 7Us8Y8ChY83XZE+dKwPdaYqQ2IIkLeO1ZEu9y+vtEZ8C1OryPnxZAQVJTK+vH6cy2JY6 t6j+KlG/yQpqF8du0M3z/uxms4BexNGmC7i3L4lim2oNh9OLfey1g0jsFCAh8JD7HtRc tO/g== X-Gm-Message-State: AIVw111cSg0rJRpioaPbAAh2ZC6uwoTGY/yU5DbXwpael21qhvm3Dh0x oIEIMoe4YaINn2qy0AiDXgEvuxYBdw== X-Received: by 10.80.164.215 with SMTP id x23mr132184edb.114.1501112960564; Wed, 26 Jul 2017 16:49:20 -0700 (PDT) MIME-Version: 1.0 Received: by 10.80.173.227 with HTTP; Wed, 26 Jul 2017 16:48:50 -0700 (PDT) In-Reply-To: References: From: Valentin Kulichenko Date: Wed, 26 Jul 2017 16:48:50 -0700 Message-ID: Subject: Re: ContinuousQueryWithTransformer implementation questions To: Nikolay Izhikov , dev@ignite.apache.org Content-Type: multipart/alternative; boundary="94eb2c0c3d4cc8419f0555411802" archived-at: Wed, 26 Jul 2017 23:49:32 -0000 --94eb2c0c3d4cc8419f0555411802 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Yeah, unfortunately current ContinuousQuery object can be used for querying with transformer. That's actually not good, because adding transformers to continuous queries and scan queries will be very inconsistent. AFAIK, there are plans to completely rework query API since we added a lot of stuff current API is not enough for (DML, DLL, etc.). Probably it makes sense to consider transformers in the new API as well. -Val On Wed, Jul 26, 2017 at 1:32 PM, Nikolay Izhikov wrote: > Hello, Valentin. > > As far as I can understand `query(Query qry, IgniteClosure > transformer)` is slightly different from what I should implement. > > > I need to pass two parameter for ContinuousQuery instead of localListener= : > > - Remote Transformer > - Local Listener for transformed events > > and method you provide can accept only transformer. > > Moreover I think I should somehow "extend" ContinuousQuery(my proposal is > new class with similar name) because issue is about possibility of > optimization of continuous query mechanism. > > Thoughts? > > > 26.07.2017 20:56, Valentin Kulichenko =D0=BF=D0=B8=D1=88=D0=B5=D1=82: > > Nikolay, >> >> We already have the following method for queries with transformer. It >> currently throws exception for ContinuousQuery. >> >> QueryCursor query(Query qry, IgniteClosure transforme= r) >> >> Would it be possible to utilize it instead of creating new API? >> >> -Val >> >> On Wed, Jul 26, 2017 at 5:26 AM, =D0=9D=D0=B8=D0=BA=D0=BE=D0=BB=D0=B0=D0= =B9 =D0=98=D0=B6=D0=B8=D0=BA=D0=BE=D0=B2 >> wrote: >> >> Hello, Igniters. >>> >>> I'm working on IGNITE-425 [1] issue. >>> I made a couple of changes in my branch [2] so I want to confirm that >>> changes with community before moving forward: >>> >>> Text of issue: >>> >>> ``` >>> Currently if updated entry passes the filter, it is sent to node >>> initiated >>> the query entirely. >>> It would be good to provide user with the ability to transform entry an= d, >>> for example, >>> select only fields that are important. This may bring huge economy to >>> traffic and lower GC pressure as well. >>> ``` >>> >>> 1. I create new class ContinuousQueryWithTransformer extends Query: >>> >>> Reasons to create entirely new class without extending ContinuousQuery: >>> >>> a. ContinuousQuery is final so user can't extends it. I don't want >>> to >>> change that. >>> b. ContinuousQuery contains some deprecated >>> methods(setRemoteFilter) so >>> with new class we can get rid of them. >>> c. Such public API design disallow usage of existing >>> localEventListener >>> with new transformedEventListenr in compile time. >>> >>> ``` >>> public final class ContinuousQueryWithTransformer extends >>> Query> { >>> public ContinuousQueryWithTransformer >>> setRemoteFilterFactory(Factory> >>> rmtFilterFactory) { /**/ } >>> >>> public ContinuousQueryWithTransformer >>> setRemoteTransformerFactory(Factory> >>> factory) { /**/ } >>> >>> public ContinuousQueryWithTransformer >>> setLocalTransformedEventListener(TransformedEventListener >>> locTransEvtLsnr) { /**/ } >>> >>> public interface TransformedEventListener { >>> void onUpdated(Iterable events) throws >>> CacheEntryListenerException; >>> } >>> } >>> ``` >>> >>> 2. I want to edit all tests from package >>> `core/src/test/java/org/apach/ignite/internal/processors/ >>> cache/query/continuous/` >>> to ensure my implementation fully support existing tests. >>> I want to make each test can work both for regular ContinousQuery and >>> ContinuousQueryWithTransformer: >>> >>> Existing test: >>> >>> ``` >>> ContinuousQuery qry =3D new ContinuousQuery<>(= ); >>> >>> qry.setLocalListener(new CacheEntryUpdatedListener>> Object>() { >>> @Override public void onUpdated(Iterable>> ?>> >>> evts) { >>> for (CacheEntryEvent evt : evts) { >>> if ((Integer)evt.getValue() >=3D 0) >>> evtCnt.incrementAndGet(); >>> } >>> } >>> }); >>> >>> ``` >>> >>> To be: >>> >>> ``` >>> Query qry =3D createContinuousQuery(); >>> >>> setLocalListener(qry, new CI1>() { >>> @Override public void apply(T2 e) { >>> if ((Integer)e.getValue() >=3D 0) >>> evtCnt.incrementAndGet(); >>> } >>> }); >>> ``` >>> >>> Base class to support setLocalListener: >>> >>> ``` >>> protected void setLocalListener(Query q, CI1> >>> lsnrClsr) >>> { >>> if (isContinuousWithTransformer()) { >>> ((ContinuousQueryWithTransformer)q) >>> .setLocalTransformedEventListener(new >>> TransformedEventListenerImpl(lsnrClsr)); >>> } else >>> ((ContinuousQuery)q).setLocalListener(new >>> CacheInvokeListener(lsnrClsr)); >>> } >>> >>> protected static class CacheInvokeListener { >>> private CI1> clsr; >>> >>> @Override public void onUpdated(Iterable>> extends >>> K, ? extends V>> events) >>> throws CacheEntryListenerException { >>> for (CacheEntryEvent e : events) >>> clsr.apply(ignite, new T2<>(e.getKey(), e.getValue()))= ; >>> } >>> } >>> >>> protected static class TransformedEventListenerImpl >>> implements >>> TransformedEventListener { >>> private IgniteBiInClosure> clsr; >>> >>> @Override public void onUpdated(Iterable evts) throws >>> CacheEntryListenerException { >>> for (Object e : evts) { >>> clsr.apply((T2)e); >>> } >>> } >>> } >>> ``` >>> >>> Thoughts? >>> >>> [1] https://issues.apache.org/jira/browse/IGNITE-425 >>> [2] https://github.com/nizhikov/ignite/pull/9/files >>> >>> -- >>> Nikolay Izhikov >>> NIzhikov.dev@gmail.com >>> >>> >> --94eb2c0c3d4cc8419f0555411802--