Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A3EA3200B4B for ; Thu, 7 Jul 2016 07:49:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A24DE160A73; Thu, 7 Jul 2016 05:49:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C2C02160A64 for ; Thu, 7 Jul 2016 07:49:04 +0200 (CEST) Received: (qmail 64774 invoked by uid 500); 7 Jul 2016 05:49:03 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 64763 invoked by uid 99); 7 Jul 2016 05:49:03 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jul 2016 05:49:03 +0000 Received: from mail-vk0-f54.google.com (mail-vk0-f54.google.com [209.85.213.54]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 695E11A00A8 for ; Thu, 7 Jul 2016 05:49:03 +0000 (UTC) Received: by mail-vk0-f54.google.com with SMTP id b192so7783745vke.0 for ; Wed, 06 Jul 2016 22:49:03 -0700 (PDT) X-Gm-Message-State: ALyK8tJhmLIhkJry/suxfwTBODZLPOihbLxFj4K5qbFEBlSJHn5vU8MxT+x06PZ6chkgk4DRwLpS+SmdXhtmMojL X-Received: by 10.176.68.2 with SMTP id m2mr11847216uam.147.1467870542091; Wed, 06 Jul 2016 22:49:02 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.143.89 with HTTP; Wed, 6 Jul 2016 22:48:42 -0700 (PDT) In-Reply-To: References: From: Bhupesh Chawda Date: Thu, 7 Jul 2016 11:18:42 +0530 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: APEXMALHAR-1701 Deduper in Malhar To: dev Content-Type: multipart/alternative; boundary=94eb2c0624703cef2d0537053ef3 archived-at: Thu, 07 Jul 2016 05:49:05 -0000 --94eb2c0624703cef2d0537053ef3 Content-Type: text/plain; charset=UTF-8 Hi David, Thanks for your reply. If I am to use a windowed operator for the Dedup operator, there should be some operator (upstream to Deduper) which sends the watermark tuples. These tuples (along with allowed lateness), will be the ones deciding which incoming tuples are too late and will be dropped. I have the following questions: Is a windowed operator (which needs watermarks) dependent upon some other operator for these tuples? What happens when there are no watermark tuples sent from upstream? Can a windowed operator "*assume*" the watermark tuples based on some notion of time? For example, can the Deduper, use the streaming window time as the reference to advance the watermark? Thanks. ~ Bhupesh On Thu, Jul 7, 2016 at 4:07 AM, David Yan wrote: > Hi Bhupesh, > > FYI, there is a JIRA open for a scalable implementation of WindowedStorage > and WindowedKeyedStorage: > > https://issues.apache.org/jira/browse/APEXMALHAR-2130 > > We expect either to use ManagedState directly, or Spillable structures, > which in turn uses ManagedState. > > I'm not very familiar with the dedup operator. but in order to use the > WindowedOperator, it sounds to me that we can use SlidingWindows with an > implementation of WindowedKeyedStorage that uses a Bloom filter to cover > most of the false cases. > > David > > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda wrote: > > > Hi All, > > > > I have looked into Windowing concepts from Apache Beam and the PR #319 by > > David. Looks like there are a lot of advanced concepts which could be > used > > by operators using event time windowing. > > Additionally I also looked at the Managed State implementation. > > > > One of the things I noticed is that there is an overlap of functionality > > between Managed State and Windowing Support in terms of the following: > > > > - *Discarding / Dropping of tuples* from the system - Managed State > uses > > the concept of expiry while a Windowed operator uses the concepts of > > Watermarks and allowed lateness. If I try to reconcile the above two, > it > > seems like Managed State (through TimeBucketAssigner) is trying to > > implement some sort of implicit heuristic Watermarks based on either > the > > user supplied time or the event time. > > - *Global Window* support - Once we have an option to disable purging > in > > Managed State, it will have similar semantics to the Global Window > > option > > in Windowing support. > > > > If I understand correctly, is the suggestion to implement the Dedup > > operator as a Windowed operator and to use managed state only as a > storage > > medium (through WindowedStorage) ? What could be a better way of going > > about this? > > > > Thanks. > > > > ~ Bhupesh > > > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda > > wrote: > > > > > Hi Thomas, > > > > > > I agree that the case of processing bounded data is a special case of > > > unbounded data. > > > Th difference I was pointing out was in terms of expiry. This is not > > > applicable in case of bounded data sets, while unbounded data sets will > > > inherently use expiry for limiting the amount of data to be stored. > > > > > > For idempotency when applying expiry on the streaming data, I need to > > > explore more on the using the window timestamp that you proposed as > > opposed > > > to the system time which I was planning to use. > > > > > > Thanks. > > > ~ Bhupesh > > > > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise > > > wrote: > > > > > >> Bhupesh, > > >> > > >> Why is there a distinction between bounded and unbounded data? I see > the > > >> former as a special case of the latter? > > >> > > >> When rewinding the stream or reprocessing the stream in another run > the > > >> operator should produce the same result. > > >> > > >> This operator should be idempotent also. That implies that code does > not > > >> rely on current system time but the window timestamp instead. > > >> > > >> All of this should be accomplished by using the windowing support: > > >> https://github.com/apache/apex-malhar/pull/319 > > >> > > >> Thanks, > > >> Thomas > > >> > > >> > > >> > > >> > > >> > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda < > > bhupesh@datatorrent.com> > > >> wrote: > > >> > > >> > Hi All, > > >> > > > >> > I want to validate the use cases for de-duplication that will be > going > > >> as > > >> > part of this implementation. > > >> > > > >> > - *Bounded data set* > > >> > - This is de-duplication for bounded data. For example, data > > sets > > >> > which are old or fixed or which may not have a time field at > > >> > all. Example: > > >> > Last year's transaction records or Customer data etc. > > >> > - Concept of expiry is not needed as this is bounded data set. > > >> > - *Unbounded data set* > > >> > - This is de-duplication of online streaming data > > >> > - Expiry is needed because here incoming tuples may arrive > later > > >> than > > >> > what they are expected. Expiry is always computed by taking > the > > >> > difference > > >> > in System time and the Event time. > > >> > > > >> > Any feedback is appreciated. > > >> > > > >> > Thanks. > > >> > > > >> > ~ Bhupesh > > >> > > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda < > > >> bhupesh@datatorrent.com> > > >> > wrote: > > >> > > > >> > > Hi All, > > >> > > > > >> > > I am working on adding a De-duplication operator in Malhar library > > >> based > > >> > > on managed state APIs. I will be working off the already created > > JIRA > > >> - > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the > > initial > > >> > > pull request for an AbstractDeduper here: > > >> > > https://github.com/apache/apex-malhar/pull/260/files > > >> > > > > >> > > I am planning to include the following features in the first > > version: > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time > > >> > > correlation holds. > > >> > > 2. Option to maintain order of incoming tuples. > > >> > > 3. Duplicate and Expired ports to emit duplicate and expired > tuples > > >> > > respectively. > > >> > > > > >> > > Thanks. > > >> > > > > >> > > ~ Bhupesh > > >> > > > > >> > > > >> > > > > > > > > > --94eb2c0624703cef2d0537053ef3--