apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@apache.org>
Subject Re: APEXMALHAR-1701 Deduper in Malhar
Date Fri, 08 Jul 2016 11:55:22 GMT
Thanks David.
I'll try to create an implementation for Deduper which uses
WindowedOperator. Will open a PR soon for review.

~ Bhupesh

On Fri, Jul 8, 2016 at 2:23 AM, David Yan <david@datatorrent.com> wrote:

> Hi Bhupesh,
>
> I just added the method setFixedLateness(long millis) to
> AbstractWindowedOperator in my PR. This will allow you to specify the
> lateness with respect to the timestamp from the window ID without watermark
> tuples from upstream.
>
> David
>
> On Thu, Jul 7, 2016 at 11:49 AM, David Yan <david@datatorrent.com> wrote:
>
> > Hi Bhupesh,
> >
> > Yes, the windowed operator currently depends on the watermark tuples
> > upstream for any "lateness" related operation. If there is no watermark,
> > nothing will be considered late. We can add support for lateness handling
> > without incoming watermark tuples. Let me add that to the pull request.
> >
> > David
> >
> >
> > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <bhupesh@apache.org>
> > wrote:
> >
> >> Hi David,
> >>
> >> Thanks for your reply.
> >>
> >> If I am to use a windowed operator for the Dedup operator, there should
> be
> >> some operator (upstream to Deduper) which sends the watermark tuples.
> >> These
> >> tuples (along with allowed lateness), will be the ones deciding which
> >> incoming tuples are too late and will be dropped. I have the following
> >> questions:
> >>
> >> Is a windowed operator (which needs watermarks) dependent upon some
> other
> >> operator for these tuples? What happens when there are no watermark
> tuples
> >> sent from upstream?
> >>
> >> Can a windowed operator "*assume*" the watermark tuples based on some
> >> notion of time? For example, can the Deduper, use the streaming window
> >> time
> >> as the reference to advance the watermark?
> >>
> >> Thanks.
> >>
> >> ~ Bhupesh
> >>
> >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <david@datatorrent.com>
> wrote:
> >>
> >> > Hi Bhupesh,
> >> >
> >> > FYI, there is a JIRA open for a scalable implementation of
> >> WindowedStorage
> >> > and WindowedKeyedStorage:
> >> >
> >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> >> >
> >> > We expect either to use ManagedState directly, or Spillable
> structures,
> >> > which in turn uses ManagedState.
> >> >
> >> > I'm not very familiar with the dedup operator. but in order to use the
> >> > WindowedOperator, it sounds to me that we can use SlidingWindows with
> an
> >> > implementation of WindowedKeyedStorage that uses a Bloom filter to
> cover
> >> > most of the false cases.
> >> >
> >> > David
> >> >
> >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <bhupesh@apache.org>
> >> wrote:
> >> >
> >> > > Hi All,
> >> > >
> >> > > I have looked into Windowing concepts from Apache Beam and the PR
> >> #319 by
> >> > > David. Looks like there are a lot of advanced concepts which could
> be
> >> > used
> >> > > by operators using event time windowing.
> >> > > Additionally I also looked at the Managed State implementation.
> >> > >
> >> > > One of the things I noticed is that there is an overlap of
> >> functionality
> >> > > between Managed State and Windowing Support in terms of the
> following:
> >> > >
> >> > >    - *Discarding / Dropping of tuples* from the system - Managed
> State
> >> > uses
> >> > >    the concept of expiry while a Windowed operator uses the concepts
> >> of
> >> > >    Watermarks and allowed lateness. If I try to reconcile the above
> >> two,
> >> > it
> >> > >    seems like Managed State (through TimeBucketAssigner) is trying
> to
> >> > >    implement some sort of implicit heuristic Watermarks based on
> >> either
> >> > the
> >> > >    user supplied time or the event time.
> >> > >    - *Global Window* support - Once we have an option to disable
> >> purging
> >> > in
> >> > >    Managed State, it will have similar semantics to the Global
> Window
> >> > > option
> >> > >    in Windowing support.
> >> > >
> >> > > If I understand correctly, is the suggestion to implement the Dedup
> >> > > operator as a Windowed operator and to use managed state only as a
> >> > storage
> >> > > medium (through WindowedStorage) ? What could be a better way of
> going
> >> > > about this?
> >> > >
> >> > > Thanks.
> >> > >
> >> > > ~ Bhupesh
> >> > >
> >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> bhupesh@apache.org>
> >> > > wrote:
> >> > >
> >> > > > Hi Thomas,
> >> > > >
> >> > > > I agree that the case of processing bounded data is a special
case
> >> of
> >> > > > unbounded data.
> >> > > > Th difference I was pointing out was in terms of expiry. This
is
> not
> >> > > > applicable in case of bounded data sets, while unbounded data
sets
> >> will
> >> > > > inherently use expiry for limiting the amount of data to be
> stored.
> >> > > >
> >> > > > For idempotency when applying expiry on the streaming data, I
need
> >> to
> >> > > > explore more on the using the window timestamp that you proposed
> as
> >> > > opposed
> >> > > > to the system time which I was planning to use.
> >> > > >
> >> > > > Thanks.
> >> > > > ~ Bhupesh
> >> > > >
> >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> >> thomas@datatorrent.com>
> >> > > > wrote:
> >> > > >
> >> > > >> Bhupesh,
> >> > > >>
> >> > > >> Why is there a distinction between bounded and unbounded
data? I
> >> see
> >> > the
> >> > > >> former as a special case of the latter?
> >> > > >>
> >> > > >> When rewinding the stream or reprocessing the stream in another
> run
> >> > the
> >> > > >> operator should produce the same result.
> >> > > >>
> >> > > >> This operator should be idempotent also. That implies that
code
> >> does
> >> > not
> >> > > >> rely on current system time but the window timestamp instead.
> >> > > >>
> >> > > >> All of this should be accomplished by using the windowing
> support:
> >> > > >> https://github.com/apache/apex-malhar/pull/319
> >> > > >>
> >> > > >> Thanks,
> >> > > >> Thomas
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> >> > > bhupesh@datatorrent.com>
> >> > > >> wrote:
> >> > > >>
> >> > > >> > Hi All,
> >> > > >> >
> >> > > >> > I want to validate the use cases for de-duplication
that will
> be
> >> > going
> >> > > >> as
> >> > > >> > part of this implementation.
> >> > > >> >
> >> > > >> >    - *Bounded data set*
> >> > > >> >       - This is de-duplication for bounded data. For
example,
> >> data
> >> > > sets
> >> > > >> >       which are old or fixed or which may not have a
time field
> >> at
> >> > > >> > all. Example:
> >> > > >> >       Last year's transaction records or Customer data
etc.
> >> > > >> >       - Concept of expiry is not needed as this is bounded
data
> >> set.
> >> > > >> >       - *Unbounded data set*
> >> > > >> >       - This is de-duplication of online streaming data
> >> > > >> >       - Expiry is needed because here incoming tuples
may
> arrive
> >> > later
> >> > > >> than
> >> > > >> >       what they are expected. Expiry is always computed
by
> taking
> >> > the
> >> > > >> > difference
> >> > > >> >       in System time and the Event time.
> >> > > >> >
> >> > > >> > Any feedback is appreciated.
> >> > > >> >
> >> > > >> > Thanks.
> >> > > >> >
> >> > > >> > ~ Bhupesh
> >> > > >> >
> >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> >> > > >> bhupesh@datatorrent.com>
> >> > > >> > wrote:
> >> > > >> >
> >> > > >> > > Hi All,
> >> > > >> > >
> >> > > >> > > I am working on adding a De-duplication operator
in Malhar
> >> library
> >> > > >> based
> >> > > >> > > on managed state APIs. I will be working off the
already
> >> created
> >> > > JIRA
> >> > > >> -
> >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701
and
> the
> >> > > initial
> >> > > >> > > pull request for an AbstractDeduper here:
> >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> >> > > >> > >
> >> > > >> > > I am planning to include the following features
in the first
> >> > > version:
> >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key
->
> >> Tuple_Time
> >> > > >> > > correlation holds.
> >> > > >> > > 2. Option to maintain order of incoming tuples.
> >> > > >> > > 3. Duplicate and Expired ports to emit duplicate
and expired
> >> > tuples
> >> > > >> > > respectively.
> >> > > >> > >
> >> > > >> > > Thanks.
> >> > > >> > >
> >> > > >> > > ~ Bhupesh
> >> > > >> > >
> >> > > >> >
> >> > > >>
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message