apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohit Jotwani <mo...@datatorrent.com>
Subject Re: APEXMALHAR-1701 Deduper in Malhar
Date Mon, 18 Jul 2016 07:11:42 GMT
Dear Community,

+1 to Bhupesh's suggestion.

I would suggest to go ahead with the Managed State and once we have proper
analysis on the windowed operator + large storage backed windowed operator
- we should implement operators such as dedup with it.

Regards,
Mohit

On Mon, Jul 18, 2016 at 12:35 PM, Bhupesh Chawda <bhupesh@apache.org> wrote:

> I can see that Dedup seems like a case where state is continuously merged
> with older state. State in this case is the set of unique tuples. However,
> for Dedup use case, the event windows are, in a way, fixed, and do not
> depend on the incoming tuples. In-coming tuples are just *assigned* to
> these windows. The point I am trying to make is that the older event
> windows will be purged (depending on the lateness configuration and
> watermarks) irrespective of the incoming tuples. Session windows on the
> other hand depend on the incoming tuples and are not fixed, and change with
> incoming data. Perhaps we should not model this use case as a session
> window.
>
> I agree that we cannot decide the approach to be followed with the current
> memory backed storage implementation. Actually, even when we have seen a
> managed state backed implementation for windowed storage, I am worried that
> the interfaces won't still be flexible enough as compared to direct usage
> of managed state and will need custom changes to fit the Dedup use case. I
> am looking at it from the perspective of asynchronous processing which will
> be necessary once we have disk IO involved for processing incoming tuples.
>
> I will suggest we move ahead with the managed state implementation for
> Deduper. We can pick up the Windowed operator based implementation once we
> have all the necessary features like windowed storage backed by managed
> state, input operators with watermark tuple support etc.
>
> Suggestions?
>
> ~ Bhupesh
>
>
> On Mon, Jul 18, 2016 at 11:29 AM, Thomas Weise <thomas@datatorrent.com>
> wrote:
>
> > Hi Bhupesh,
> >
> > Dedup is different with regard to state accumulation. For other windowed
> > operations, we collect state and then emit a result after a period of
> time
> > (trigger or watermark). Here, we only need the state to detect the
> > duplicate. Hence, it is inefficient to collect a list of tuples to
> > determine that a subsequently arriving tuple is a duplicate or not. But
> > isn't this scenario similar to the session window, where state is
> > continuously merged.
> >
> > I would prefer to see more analysis on performance and scalability to
> large
> > key cardinality. The window operator only has the memory backed window
> > store at this time. Until there is a managed state backed implementation
> > that has seen benchmarking, we cannot really use it as baseline for
> further
> > implementations on top of it.
> >
> > Thomas
> >
> >
> > On Thu, Jul 14, 2016 at 7:55 PM, Bhupesh Chawda <bhupesh@apache.org>
> > wrote:
> >
> > > Hi All,
> > >
> > > I also implemented a De-duplication operator using Windowed Operator.
> Now
> > > we have two implementations, one with Managed state and another using
> > > Windowed operator. Here are their details:
> > >
> > >    1. *With Managed State - *
> > >    - The operator is implemented using managed state as the storage for
> > >       buckets into which the tuples will be stored.
> > >       - *TimeBucketAssigner* is used to assign an incoming tuple to
> > >       different buckets based on the event time. It is also used to
> > > identify
> > >       whether a particular tuple is expired and should be sent to the
> > > expired
> > >       port / dropped.
> > >       - For managed state, the *ManagedTimeUnifiedStateImpl*
> > implementation
> > >       is used which just requires the user to specify the event time
> > > and a bucket
> > >       is automatically assigned based on that. The structure of the
> > bucket
> > > data
> > >       on storage is as follows: /operator_id /time_bucket
> > >       - An advantage of using Managed State approach is that we don't
> > have
> > >       to assume the correlation of event time to the de-duplication key
> > of
> > > the
> > >       tuple. For example, if we get two tuples like: (K1, T1), and (K1,
> > > T2), we
> > >       can still use ManagedStateImpl and conclude that these tuples are
> > >       duplicates based on the Key K1.
> > >       2. *With Windowed Operator - *
> > >    - The operator uses the WindowedOperatorImpl as the base operator.
> > >       - Accumulation, for the deduper, basically amounts to storing a
> > list
> > >       of tuples in the data storage. Every time we get a unique tuple,
> we
> > >       *accumulate* it in the list.
> > >       - Event windows are modeled using the *TimeWindow* option.
> Although
> > >       SlidingTimeWIndows seems to be intuitive for data buckets, it
> seems
> > > to be
> > >       the costly option as the accumulation in this case is not just
> > > an aggregate
> > >       value but a list of values in that bucket.
> > >       - Watermarks are not assumed to be sent from an input operator
> > >       (although it is okay if an upstream operator sends them). The
> > >       *fixedWatermark* feature is used to assume watermarks which are
> > >       relative to the window time.
> > >       - One of the issues I found with using WindowedOperator for Dedup
> > is
> > >       that event time is tightly coupled with the de-duplication key.
> In
> > > the
> > >       above example, (K1, T1), and (K1, T2) *might* be concluded as two
> > >       unique tuples since T1 and T2 may fall into two different time
> > > buckets.
> > >
> > > Here are the PRs for both of them.
> > >
> > >    - Using Managed State:
> https://github.com/apache/apex-malhar/pull/335
> > >    - Using Windowed Operator:
> > > https://github.com/apache/apex-malhar/pull/343
> > >
> > > Please review them and suggest on the correct approach for the final
> > > implementation which should be used to add other features like fault
> > > tolerance, scalability, optimizations etc.
> > > Thanks.
> > >
> > > ~ Bhupesh
> > >
> > > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <david@datatorrent.com>
> > wrote:
> > >
> > > > No problem.
> > > >
> > > > By the way, I changed the method name to setFixedWatermark. And also,
> > if
> > > > you want to drop any tuples that are considered late, you need to set
> > the
> > > > allowed lateness to be 0.
> > > >
> > > > David
> > > >
> > > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bhupesh@apache.org>
> > > wrote:
> > > >
> > > > > Thanks David.
> > > > > I'll try to create an implementation for Deduper which uses
> > > > > WindowedOperator. Will open a PR soon for review.
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <david@datatorrent.com>
> > > wrote:
> > > > >
> > > > > > Hi Bhupesh,
> > > > > >
> > > > > > I just added the method setFixedLateness(long millis) to
> > > > > > AbstractWindowedOperator in my PR. This will allow you to specify
> > the
> > > > > > lateness with respect to the timestamp from the window ID without
> > > > > watermark
> > > > > > tuples from upstream.
> > > > > >
> > > > > > David
> > > > > >
> > > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <
> david@datatorrent.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Bhupesh,
> > > > > > >
> > > > > > > Yes, the windowed operator currently depends on the watermark
> > > tuples
> > > > > > > upstream for any "lateness" related operation. If there
is no
> > > > > watermark,
> > > > > > > nothing will be considered late. We can add support for
> lateness
> > > > > handling
> > > > > > > without incoming watermark tuples. Let me add that to the
pull
> > > > request.
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> > > bhupesh@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi David,
> > > > > > >>
> > > > > > >> Thanks for your reply.
> > > > > > >>
> > > > > > >> If I am to use a windowed operator for the Dedup operator,
> there
> > > > > should
> > > > > > be
> > > > > > >> some operator (upstream to Deduper) which sends the
watermark
> > > > tuples.
> > > > > > >> These
> > > > > > >> tuples (along with allowed lateness), will be the ones
> deciding
> > > > which
> > > > > > >> incoming tuples are too late and will be dropped. I
have the
> > > > following
> > > > > > >> questions:
> > > > > > >>
> > > > > > >> Is a windowed operator (which needs watermarks) dependent
upon
> > > some
> > > > > > other
> > > > > > >> operator for these tuples? What happens when there
are no
> > > watermark
> > > > > > tuples
> > > > > > >> sent from upstream?
> > > > > > >>
> > > > > > >> Can a windowed operator "*assume*" the watermark tuples
based
> on
> > > > some
> > > > > > >> notion of time? For example, can the Deduper, use the
> streaming
> > > > window
> > > > > > >> time
> > > > > > >> as the reference to advance the watermark?
> > > > > > >>
> > > > > > >> Thanks.
> > > > > > >>
> > > > > > >> ~ Bhupesh
> > > > > > >>
> > > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <
> > david@datatorrent.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Hi Bhupesh,
> > > > > > >> >
> > > > > > >> > FYI, there is a JIRA open for a scalable implementation
of
> > > > > > >> WindowedStorage
> > > > > > >> > and WindowedKeyedStorage:
> > > > > > >> >
> > > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > > > >> >
> > > > > > >> > We expect either to use ManagedState directly,
or Spillable
> > > > > > structures,
> > > > > > >> > which in turn uses ManagedState.
> > > > > > >> >
> > > > > > >> > I'm not very familiar with the dedup operator.
but in order
> to
> > > use
> > > > > the
> > > > > > >> > WindowedOperator, it sounds to me that we can
use
> > SlidingWindows
> > > > > with
> > > > > > an
> > > > > > >> > implementation of WindowedKeyedStorage that uses
a Bloom
> > filter
> > > to
> > > > > > cover
> > > > > > >> > most of the false cases.
> > > > > > >> >
> > > > > > >> > David
> > > > > > >> >
> > > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda
<
> > > > bhupesh@apache.org>
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> > > Hi All,
> > > > > > >> > >
> > > > > > >> > > I have looked into Windowing concepts from
Apache Beam and
> > the
> > > > PR
> > > > > > >> #319 by
> > > > > > >> > > David. Looks like there are a lot of advanced
concepts
> which
> > > > could
> > > > > > be
> > > > > > >> > used
> > > > > > >> > > by operators using event time windowing.
> > > > > > >> > > Additionally I also looked at the Managed
State
> > > implementation.
> > > > > > >> > >
> > > > > > >> > > One of the things I noticed is that there
is an overlap of
> > > > > > >> functionality
> > > > > > >> > > between Managed State and Windowing Support
in terms of
> the
> > > > > > following:
> > > > > > >> > >
> > > > > > >> > >    - *Discarding / Dropping of tuples* from
the system -
> > > Managed
> > > > > > State
> > > > > > >> > uses
> > > > > > >> > >    the concept of expiry while a Windowed
operator uses
> the
> > > > > concepts
> > > > > > >> of
> > > > > > >> > >    Watermarks and allowed lateness. If I
try to reconcile
> > the
> > > > > above
> > > > > > >> two,
> > > > > > >> > it
> > > > > > >> > >    seems like Managed State (through TimeBucketAssigner)
> is
> > > > trying
> > > > > > to
> > > > > > >> > >    implement some sort of implicit heuristic
Watermarks
> > based
> > > on
> > > > > > >> either
> > > > > > >> > the
> > > > > > >> > >    user supplied time or the event time.
> > > > > > >> > >    - *Global Window* support - Once we have
an option to
> > > disable
> > > > > > >> purging
> > > > > > >> > in
> > > > > > >> > >    Managed State, it will have similar semantics
to the
> > Global
> > > > > > Window
> > > > > > >> > > option
> > > > > > >> > >    in Windowing support.
> > > > > > >> > >
> > > > > > >> > > If I understand correctly, is the suggestion
to implement
> > the
> > > > > Dedup
> > > > > > >> > > operator as a Windowed operator and to use
managed state
> > only
> > > > as a
> > > > > > >> > storage
> > > > > > >> > > medium (through WindowedStorage) ? What could
be a better
> > way
> > > of
> > > > > > going
> > > > > > >> > > about this?
> > > > > > >> > >
> > > > > > >> > > Thanks.
> > > > > > >> > >
> > > > > > >> > > ~ Bhupesh
> > > > > > >> > >
> > > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh
Chawda <
> > > > > > bhupesh@apache.org>
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hi Thomas,
> > > > > > >> > > >
> > > > > > >> > > > I agree that the case of processing
bounded data is a
> > > special
> > > > > case
> > > > > > >> of
> > > > > > >> > > > unbounded data.
> > > > > > >> > > > Th difference I was pointing out was
in terms of expiry.
> > > This
> > > > is
> > > > > > not
> > > > > > >> > > > applicable in case of bounded data sets,
while unbounded
> > > data
> > > > > sets
> > > > > > >> will
> > > > > > >> > > > inherently use expiry for limiting the
amount of data to
> > be
> > > > > > stored.
> > > > > > >> > > >
> > > > > > >> > > > For idempotency when applying expiry
on the streaming
> > data,
> > > I
> > > > > need
> > > > > > >> to
> > > > > > >> > > > explore more on the using the window
timestamp that you
> > > > proposed
> > > > > > as
> > > > > > >> > > opposed
> > > > > > >> > > > to the system time which I was planning
to use.
> > > > > > >> > > >
> > > > > > >> > > > Thanks.
> > > > > > >> > > > ~ Bhupesh
> > > > > > >> > > >
> > > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas
Weise <
> > > > > > >> thomas@datatorrent.com>
> > > > > > >> > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > >> Bhupesh,
> > > > > > >> > > >>
> > > > > > >> > > >> Why is there a distinction between
bounded and
> unbounded
> > > > data?
> > > > > I
> > > > > > >> see
> > > > > > >> > the
> > > > > > >> > > >> former as a special case of the
latter?
> > > > > > >> > > >>
> > > > > > >> > > >> When rewinding the stream or reprocessing
the stream in
> > > > another
> > > > > > run
> > > > > > >> > the
> > > > > > >> > > >> operator should produce the same
result.
> > > > > > >> > > >>
> > > > > > >> > > >> This operator should be idempotent
also. That implies
> > that
> > > > code
> > > > > > >> does
> > > > > > >> > not
> > > > > > >> > > >> rely on current system time but
the window timestamp
> > > instead.
> > > > > > >> > > >>
> > > > > > >> > > >> All of this should be accomplished
by using the
> windowing
> > > > > > support:
> > > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > > > >> > > >>
> > > > > > >> > > >> Thanks,
> > > > > > >> > > >> Thomas
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM,
Bhupesh Chawda <
> > > > > > >> > > bhupesh@datatorrent.com>
> > > > > > >> > > >> wrote:
> > > > > > >> > > >>
> > > > > > >> > > >> > Hi All,
> > > > > > >> > > >> >
> > > > > > >> > > >> > I want to validate the use
cases for de-duplication
> > that
> > > > will
> > > > > > be
> > > > > > >> > going
> > > > > > >> > > >> as
> > > > > > >> > > >> > part of this implementation.
> > > > > > >> > > >> >
> > > > > > >> > > >> >    - *Bounded data set*
> > > > > > >> > > >> >       - This is de-duplication
for bounded data. For
> > > > example,
> > > > > > >> data
> > > > > > >> > > sets
> > > > > > >> > > >> >       which are old or fixed
or which may not have a
> > time
> > > > > field
> > > > > > >> at
> > > > > > >> > > >> > all. Example:
> > > > > > >> > > >> >       Last year's transaction
records or Customer
> data
> > > etc.
> > > > > > >> > > >> >       - Concept of expiry is
not needed as this is
> > > bounded
> > > > > data
> > > > > > >> set.
> > > > > > >> > > >> >       - *Unbounded data set*
> > > > > > >> > > >> >       - This is de-duplication
of online streaming
> data
> > > > > > >> > > >> >       - Expiry is needed because
here incoming tuples
> > may
> > > > > > arrive
> > > > > > >> > later
> > > > > > >> > > >> than
> > > > > > >> > > >> >       what they are expected.
Expiry is always
> computed
> > > by
> > > > > > taking
> > > > > > >> > the
> > > > > > >> > > >> > difference
> > > > > > >> > > >> >       in System time and the
Event time.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Any feedback is appreciated.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Thanks.
> > > > > > >> > > >> >
> > > > > > >> > > >> > ~ Bhupesh
> > > > > > >> > > >> >
> > > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34
AM, Bhupesh Chawda <
> > > > > > >> > > >> bhupesh@datatorrent.com>
> > > > > > >> > > >> > wrote:
> > > > > > >> > > >> >
> > > > > > >> > > >> > > Hi All,
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > I am working on adding
a De-duplication operator in
> > > > Malhar
> > > > > > >> library
> > > > > > >> > > >> based
> > > > > > >> > > >> > > on managed state APIs.
I will be working off the
> > > already
> > > > > > >> created
> > > > > > >> > > JIRA
> > > > > > >> > > >> -
> > > > > > >> > > >> > >
> > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > > > and
> > > > > > the
> > > > > > >> > > initial
> > > > > > >> > > >> > > pull request for an AbstractDeduper
here:
> > > > > > >> > > >> > >
> https://github.com/apache/apex-malhar/pull/260/files
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > I am planning to include
the following features in
> > the
> > > > > first
> > > > > > >> > > version:
> > > > > > >> > > >> > > 1. Time based de-duplication.
Assumption: Tuple_Key
> > ->
> > > > > > >> Tuple_Time
> > > > > > >> > > >> > > correlation holds.
> > > > > > >> > > >> > > 2. Option to maintain
order of incoming tuples.
> > > > > > >> > > >> > > 3. Duplicate and Expired
ports to emit duplicate
> and
> > > > > expired
> > > > > > >> > tuples
> > > > > > >> > > >> > > respectively.
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > Thanks.
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > ~ Bhupesh
> > > > > > >> > > >> > >
> > > > > > >> > > >> >
> > > > > > >> > > >>
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message