Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 454BB200B38 for ; Fri, 8 Jul 2016 13:55:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 43EA6160A5A; Fri, 8 Jul 2016 11:55:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 63901160A58 for ; Fri, 8 Jul 2016 13:55:44 +0200 (CEST) Received: (qmail 77401 invoked by uid 500); 8 Jul 2016 11:55:43 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 77390 invoked by uid 99); 8 Jul 2016 11:55:43 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2016 11:55:43 +0000 Received: from mail-vk0-f43.google.com (mail-vk0-f43.google.com [209.85.213.43]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 1C4371A01BB for ; Fri, 8 Jul 2016 11:55:43 +0000 (UTC) Received: by mail-vk0-f43.google.com with SMTP id f7so38625438vkb.3 for ; Fri, 08 Jul 2016 04:55:42 -0700 (PDT) X-Gm-Message-State: ALyK8tIr9l9H3vtazBVuohDduva3vUA21SINluxwJOv1NjkgxehCi3hX48AphavgQqfwr69sN0C7WiUTpaPxdg7X X-Received: by 10.176.68.2 with SMTP id m2mr2446724uam.147.1467978942224; Fri, 08 Jul 2016 04:55:42 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.143.89 with HTTP; Fri, 8 Jul 2016 04:55:22 -0700 (PDT) In-Reply-To: References: From: Bhupesh Chawda Date: Fri, 8 Jul 2016 17:25:22 +0530 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: APEXMALHAR-1701 Deduper in Malhar To: dev Content-Type: multipart/alternative; boundary=94eb2c06247063e65305371e7ba8 archived-at: Fri, 08 Jul 2016 11:55:45 -0000 --94eb2c06247063e65305371e7ba8 Content-Type: text/plain; charset=UTF-8 Thanks David. I'll try to create an implementation for Deduper which uses WindowedOperator. Will open a PR soon for review. ~ Bhupesh On Fri, Jul 8, 2016 at 2:23 AM, David Yan wrote: > Hi Bhupesh, > > I just added the method setFixedLateness(long millis) to > AbstractWindowedOperator in my PR. This will allow you to specify the > lateness with respect to the timestamp from the window ID without watermark > tuples from upstream. > > David > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan wrote: > > > Hi Bhupesh, > > > > Yes, the windowed operator currently depends on the watermark tuples > > upstream for any "lateness" related operation. If there is no watermark, > > nothing will be considered late. We can add support for lateness handling > > without incoming watermark tuples. Let me add that to the pull request. > > > > David > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda > > wrote: > > > >> Hi David, > >> > >> Thanks for your reply. > >> > >> If I am to use a windowed operator for the Dedup operator, there should > be > >> some operator (upstream to Deduper) which sends the watermark tuples. > >> These > >> tuples (along with allowed lateness), will be the ones deciding which > >> incoming tuples are too late and will be dropped. I have the following > >> questions: > >> > >> Is a windowed operator (which needs watermarks) dependent upon some > other > >> operator for these tuples? What happens when there are no watermark > tuples > >> sent from upstream? > >> > >> Can a windowed operator "*assume*" the watermark tuples based on some > >> notion of time? For example, can the Deduper, use the streaming window > >> time > >> as the reference to advance the watermark? > >> > >> Thanks. > >> > >> ~ Bhupesh > >> > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan > wrote: > >> > >> > Hi Bhupesh, > >> > > >> > FYI, there is a JIRA open for a scalable implementation of > >> WindowedStorage > >> > and WindowedKeyedStorage: > >> > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130 > >> > > >> > We expect either to use ManagedState directly, or Spillable > structures, > >> > which in turn uses ManagedState. > >> > > >> > I'm not very familiar with the dedup operator. but in order to use the > >> > WindowedOperator, it sounds to me that we can use SlidingWindows with > an > >> > implementation of WindowedKeyedStorage that uses a Bloom filter to > cover > >> > most of the false cases. > >> > > >> > David > >> > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda > >> wrote: > >> > > >> > > Hi All, > >> > > > >> > > I have looked into Windowing concepts from Apache Beam and the PR > >> #319 by > >> > > David. Looks like there are a lot of advanced concepts which could > be > >> > used > >> > > by operators using event time windowing. > >> > > Additionally I also looked at the Managed State implementation. > >> > > > >> > > One of the things I noticed is that there is an overlap of > >> functionality > >> > > between Managed State and Windowing Support in terms of the > following: > >> > > > >> > > - *Discarding / Dropping of tuples* from the system - Managed > State > >> > uses > >> > > the concept of expiry while a Windowed operator uses the concepts > >> of > >> > > Watermarks and allowed lateness. If I try to reconcile the above > >> two, > >> > it > >> > > seems like Managed State (through TimeBucketAssigner) is trying > to > >> > > implement some sort of implicit heuristic Watermarks based on > >> either > >> > the > >> > > user supplied time or the event time. > >> > > - *Global Window* support - Once we have an option to disable > >> purging > >> > in > >> > > Managed State, it will have similar semantics to the Global > Window > >> > > option > >> > > in Windowing support. > >> > > > >> > > If I understand correctly, is the suggestion to implement the Dedup > >> > > operator as a Windowed operator and to use managed state only as a > >> > storage > >> > > medium (through WindowedStorage) ? What could be a better way of > going > >> > > about this? > >> > > > >> > > Thanks. > >> > > > >> > > ~ Bhupesh > >> > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda < > bhupesh@apache.org> > >> > > wrote: > >> > > > >> > > > Hi Thomas, > >> > > > > >> > > > I agree that the case of processing bounded data is a special case > >> of > >> > > > unbounded data. > >> > > > Th difference I was pointing out was in terms of expiry. This is > not > >> > > > applicable in case of bounded data sets, while unbounded data sets > >> will > >> > > > inherently use expiry for limiting the amount of data to be > stored. > >> > > > > >> > > > For idempotency when applying expiry on the streaming data, I need > >> to > >> > > > explore more on the using the window timestamp that you proposed > as > >> > > opposed > >> > > > to the system time which I was planning to use. > >> > > > > >> > > > Thanks. > >> > > > ~ Bhupesh > >> > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise < > >> thomas@datatorrent.com> > >> > > > wrote: > >> > > > > >> > > >> Bhupesh, > >> > > >> > >> > > >> Why is there a distinction between bounded and unbounded data? I > >> see > >> > the > >> > > >> former as a special case of the latter? > >> > > >> > >> > > >> When rewinding the stream or reprocessing the stream in another > run > >> > the > >> > > >> operator should produce the same result. > >> > > >> > >> > > >> This operator should be idempotent also. That implies that code > >> does > >> > not > >> > > >> rely on current system time but the window timestamp instead. > >> > > >> > >> > > >> All of this should be accomplished by using the windowing > support: > >> > > >> https://github.com/apache/apex-malhar/pull/319 > >> > > >> > >> > > >> Thanks, > >> > > >> Thomas > >> > > >> > >> > > >> > >> > > >> > >> > > >> > >> > > >> > >> > > >> > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda < > >> > > bhupesh@datatorrent.com> > >> > > >> wrote: > >> > > >> > >> > > >> > Hi All, > >> > > >> > > >> > > >> > I want to validate the use cases for de-duplication that will > be > >> > going > >> > > >> as > >> > > >> > part of this implementation. > >> > > >> > > >> > > >> > - *Bounded data set* > >> > > >> > - This is de-duplication for bounded data. For example, > >> data > >> > > sets > >> > > >> > which are old or fixed or which may not have a time field > >> at > >> > > >> > all. Example: > >> > > >> > Last year's transaction records or Customer data etc. > >> > > >> > - Concept of expiry is not needed as this is bounded data > >> set. > >> > > >> > - *Unbounded data set* > >> > > >> > - This is de-duplication of online streaming data > >> > > >> > - Expiry is needed because here incoming tuples may > arrive > >> > later > >> > > >> than > >> > > >> > what they are expected. Expiry is always computed by > taking > >> > the > >> > > >> > difference > >> > > >> > in System time and the Event time. > >> > > >> > > >> > > >> > Any feedback is appreciated. > >> > > >> > > >> > > >> > Thanks. > >> > > >> > > >> > > >> > ~ Bhupesh > >> > > >> > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda < > >> > > >> bhupesh@datatorrent.com> > >> > > >> > wrote: > >> > > >> > > >> > > >> > > Hi All, > >> > > >> > > > >> > > >> > > I am working on adding a De-duplication operator in Malhar > >> library > >> > > >> based > >> > > >> > > on managed state APIs. I will be working off the already > >> created > >> > > JIRA > >> > > >> - > >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and > the > >> > > initial > >> > > >> > > pull request for an AbstractDeduper here: > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files > >> > > >> > > > >> > > >> > > I am planning to include the following features in the first > >> > > version: > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key -> > >> Tuple_Time > >> > > >> > > correlation holds. > >> > > >> > > 2. Option to maintain order of incoming tuples. > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and expired > >> > tuples > >> > > >> > > respectively. > >> > > >> > > > >> > > >> > > Thanks. > >> > > >> > > > >> > > >> > > ~ Bhupesh > >> > > >> > > > >> > > >> > > >> > > >> > >> > > > > >> > > > > >> > > > >> > > >> > > > > > --94eb2c06247063e65305371e7ba8--