kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-478 Strongly Typed Processor API
Date Tue, 25 Jun 2019 02:22:23 GMT
Hi John,

Yeah I think we should not do all the repackaging as part of this KIP as
well (we can just do the movement of the Processor / ProcessorSupplier),
but I think we need to discuss the end goal here since otherwise we may do
the repackaging of Processor in this KIP, but only later on realizing that
other re-packagings are not our favorite solutions.


Guozhang

On Mon, Jun 24, 2019 at 3:06 PM John Roesler <john@confluent.io> wrote:

> Hey Guozhang,
>
> Thanks for the idea! I'm wondering if we could take a middle ground
> and take your proposed layout as a "roadmap", while only actually
> moving the classes that are already involved in this KIP.
>
> The reason I ask is not just to control the scope of this KIP, but
> also, I think that if we move other classes to new packages, we might
> also want to take the opportunity to clean up other things about them.
> But each one of those would become a discussion point of its own, so
> it seems the discussion would become intractable. FWIW, I do like your
> idea for precisely this reason, it creates opportunities for us to
> consider other changes that we are simply not able to make without
> breaking source compatibility.
>
> If the others feel "kind of favorable" with this overall vision, maybe
> we can make one or more Jira tickets to capture it, and then just
> alter _this_ proposal to `processor.api.Processor` (etc).
>
> WDYT?
> -John
>
> On Sun, Jun 23, 2019 at 7:17 PM Guozhang Wang <wangguoz@gmail.com> wrote:
> >
> > Hello John,
> >
> > Thanks for your detailed explanation, I've done some quick checks on some
> > existing examples that heavily used Processor and the results also makes
> me
> > worried about my previous statements that "the breakage would not be
> big".
> > I agree we should maintain compatibility.
> >
> > About the naming itself, I'm actually a bit inclined into sub-packages
> than
> > renamed new classes, and my motivations are that our current packaging is
> > already quite coarsen grained and sometimes ill-placed, and hence maybe
> we
> > can take this change along with some clean up on packages (but again, we
> > should follow the deprecate - removal path). What I'm thinking is:
> >
> > -------------------
> >
> > processor/: StateRestoreCallback/AbstractNotifyingRestoreCallback,
> (deprecated
> > later, same meaning for other cross-throughs), ProcessContest,
> > RecordContext, Punctuator, PunctuationType, To, Cancellable (are the only
> > things left)
> >
> > (new) processor/api/: Processor, ProcessorSupplier (and of course, these
> > two classes can be strong typed)
> >
> > state/: StateStore, BatchingStateRestoreCallback,
> > AbstractNotifyingBatchingRestoreCallback (moved from processor/),
> > PartitionGrouper, WindowStoreIterator, StateSerdes (this one can be moved
> > into state/internals), TimestampedByteStore (we can move this to
> internals
> > since store types would use vat by default, see below), ValueAndTimestamp
> >
> > (new) state/factory/: Stores, StoreBuilder, StoreSupplier; *BUT* the new
> > Stores would not have timestampedXXBuilder APIs since the default
> > StoreSupplier / StoreBuilder value types are ValueAndTimestamp already.
> >
> > (new) state/queryable/: QueryableStoreType, QueryableStoreTypes, HostInfo
> >
> > (new) state/keyValue/: KeyValueXXX classes, and also the same for
> > state/sessionWindow and state/timeWindow; *BUT* here we use
> > ValueAndTimestamp as value types of those APIs directly, and also
> > TimestampedKeyValue/WindowStore would be deprecated.
> >
> > (new) kstream/api/: KStream, KTable, GroupedKStream (renamed from
> > KGroupedStream), GroupedKTable (renamed from KGroupedTable),
> > TimeWindowedKStream, SessionWindowedKStream, GlobalKTable
> >
> > (new) kstream/operator/: Aggregator, ForeachFunction,  ... , Merger and
> > Grouped, Joined, Materialized, ... , Printed and Transformer,
> > TransformerSupplier.
> >
> > (new) kstream/window/: Window, Windows, Windowed, TimeWindows,
> > SessionWindows, UnlimitedWindows, JoinWindows, WindowedSerdes,
> > Time/SessionWindowedSerialized/Deserializer.
> >
> > (new) configure/: RocksDBConfigSetter, TopicNameExtractor,
> > TimestampExtractor, UsePreviousTimeOnInvalidTimestamp,
> > WallclockTimestampExtractor, ExtractRecordMetadataTimestamp,
> > FailOnInvalidTimestamp, LogAndSkipOnInvalidTimestamp,
> StateRestoreListener,
> >
> > (new) metadata/: StreamsMetadata, ThreadMetadata, TaskMetadata, TaskId
> >
> > Still, any xxx/internals packages are declared as inner classes, but
> other
> > xxx/yyy packages are declared as public APIs.
> >
> > -------------------
> >
> > This is a very wild thought and I can totally understand if people feel
> > this is too much since it definitely enlarges the scope of this KIP a lot
> > :) just trying to play a devil's advocate here to do major refactoring
> and
> > avoid renaming Processor classes.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jun 21, 2019 at 9:51 PM Matthias J. Sax <matthias@confluent.io>
> > wrote:
> > >
> > > I think `RecordProcessor` is a good name.
> > >
> > >
> > > -Matthias
> > >
> > > On 6/21/19 5:09 PM, John Roesler wrote:
> > > > After kicking the naming around a bit more, it seems like any package
> > > > name change is a bit "weird" because it fragments the package and
> > > > directory structure. If we can come up with a reasonable name for the
> > > > interface after all, it seems like the better choice.
> > > >
> > > > The real challenge is that the existing name "Processor" seems just
> > > > about perfect. In picking a new name, we need to consider the
> ultimate
> > > > state, after the deprecation period, when we entirely remove
> > > > Processor. In this context, TypedProcessor seems a little odd to me,
> > > > because it seems to imply that there should also be an "untyped
> > > > processor".
> > > >
> > > > After kicking around a few other ideas, what does everyone think
> about
> > > > "RecordProcessor"? I _think_ maybe it stands on its own just fine,
> > > > because it's a thing that processes... records?
> > > >
> > > > If others agree with this, I can change the proposal to
> RecordProcessor.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Fri, Jun 21, 2019 at 6:42 PM John Roesler <john@confluent.io>
> wrote:
> > > >>
> > > >> Hi all,
> > > >>
> > > >> I've updated the KIP with the feedback so far.
> > > >>
> > > >> The naming question is still the biggest (only?) outstanding issue.
> It
> > > >> would be good to hear some more thoughts on it.
> > > >>
> > > >> As we stand now, there's one vote for changing the package name to
> > > >> something like 'typedprocessor', one for changing the interface to
> > > >> TypedProcessor (as in the PoC), and one for just changing the
> > > >> Processor interface in-place, breaking source compatibility.
> > > >>
> > > >> How can we resolve this decision?
> > > >>
> > > >> Thanks,
> > > >> -John
> > > >>
> > > >> On Thu, Jun 20, 2019 at 5:44 PM John Roesler <john@confluent.io>
> wrote:
> > > >>>
> > > >>> Thanks for the feedback, Guozhang and Matthias,
> > > >>>
> > > >>> Regarding motivation: I'll update the wiki. Briefly:
> > > >>> * Any processor can benefit. Imagine a pure user of the
> ProcessorAPI
> > > >>> who has very complex processing logic. I have seen several
> processor
> > > >>> implementation that are hundreds of lines long and call
> > > >>> `context.forward` in many different locations and branches. In
> such an
> > > >>> implementation, it would be very easy to have a bug in a rarely
> used
> > > >>> branch that forwards the wrong kind of value. This would
> structurally
> > > >>> prevent that from happening.
> > > >>> * Also, anyone who heavily uses the ProcessorAPI would likely
have
> > > >>> developed helper methods to wire together processors, just as
we
> have
> > > >>> in the DSL implementation. This change would enable them to ensure
> at
> > > >>> compile time that they are actually wiring together compatible
> types.
> > > >>> This was actually _my_ original motivation, since I found it very
> > > >>> difficult and time consuming to follow the Streams DSL internal
> > > >>> builders.
> > > >>>
> > > >>> Regarding breaking the source compatibility of Processor: I would
> > > >>> _love_ to side-step the naming problem, but I really don't know
if
> > > >>> it's excusable to break compatibility. I suspect that our oldest
> and
> > > >>> dearest friends are using the ProcessorAPI in some form or another,
> > > >>> and all their source code would break. It sucks to have to create
a
> > > >>> whole new interface to get around this, but it feels like the
right
> > > >>> thing to do. Would be nice to get even more feedback on this point,
> > > >>> though.
> > > >>>
> > > >>> Regarding the types of stores, as I said in my response to Sophie,
> > > >>> it's not an issue.
> > > >>>
> > > >>> Regarding the change to StreamsBuilder, it doesn't pin the types
in
> > > >>> any way, since all the types are bounded by Object only, and there
> are
> > > >>> no extra constraints between arguments (each type is used only
> once in
> > > >>> one argument). But maybe I missed the point you were asking about.
> > > >>> Since the type takes generic paramters, we should allow users
to
> pass
> > > >>> in parameterized arguments. Otherwise, they would _have to_ give
> us a
> > > >>> raw type, and they would be forced to get a "rawtyes" warning
from
> the
> > > >>> compiler. So, it's our obligation in any API that accepts a
> > > >>> parameterized-type parameter to allow people to actually pass
a
> > > >>> parameterized type, even if we don't actually use the parameters.
> > > >>>
> > > >>> The naming question is a complex one, as I took pains to detail
> > > >>> previously. Please don't just pick out one minor point, call it
> weak,
> > > >>> and then claim that it invalidates the whole decision. I don't
> think
> > > >>> there's a clear best choice, so I'm more than happy for someone
to
> > > >>> advocate for renaming the class instead of the package. Can you
> > > >>> provide some reasons why you think that would be better?
> > > >>>
> > > >>> Regarding the deprecated methods, you're absolutely right. I'll
> > update the KIP.
> > > >>>
> > > >>> Thanks again for all the feedback!
> > > >>> -John
> > > >>>
> > > >>> On Thu, Jun 20, 2019 at 4:34 PM Matthias J. Sax <
> matthias@confluent.io>
> > wrote:
> > > >>>>
> > > >>>> Just want to second what Sophie said about the stores. The
type
> of a
> > > >>>> used stores is completely independent of input/output types.
> > > >>>>
> > > >>>> This related to change `addGlobalStore()` method. Why do you
want
> to
> > pin
> > > >>>> the types? In fact, people request the ability to filter()
and
> maybe
> > > >>>> even map() the data before they are put into the global store.
> > Limiting
> > > >>>> the types seems to be a step backward here?
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Also, the pack name is questionable.
> > > >>>>
> > > >>>>> This wouldn't be the first project to do something like
this...
> > > >>>>
> > > >>>> Not a strong argument. I would actually propose to not a a
new
> > package,
> > > >>>> but just a new class `TypedProcessor`.
> > > >>>>
> > > >>>>
> > > >>>> For `ProcessorContext#forward` methods -- some of those methods
> are
> > > >>>> already deprecated. While the will still be affected, it would
be
> > worth
> > > >>>> to mark them as deprecated in the wiki page, too.
> > > >>>>
> > > >>>>
> > > >>>> @Guozhang: I dont' think we should break source compatibility
in a
> > minor
> > > >>>> release.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On 6/20/19 1:43 PM, Guozhang Wang wrote:
> > > >>>>> Hi John,
> > > >>>>>
> > > >>>>> Thanks for KIP! I've a few comments below:
> > > >>>>>
> > > >>>>> 1. So far the "Motivation" section is very general, and
the only
> > concrete
> > > >>>>> example that I have in mind is `TransformValues#punctuate`.
Do we
> > have any
> > > >>>>> other concrete issues that drive this KIP? If not then
I feel
> > better to
> > > >>>>> narrow the scope of this KIP to:
> > > >>>>>
> > > >>>>> 1.a) modifying ProcessorContext only with the output types
on
> > forward.
> > > >>>>> 1.b) modifying Transformer signature to have generics
of
> > ProcessorContext,
> > > >>>>> and then lift the restricting of not using punctuate:
if user did
> > not
> > > >>>>> follow the enforced typing and just code without generics,
they
> > will get
> > > >>>>> warning at compile time and get run-time error if they
forward
> > wrong-typed
> > > >>>>> records, which I think would be acceptable.
> > > >>>>>
> > > >>>>> I feel this would be a good solution for this specific
issue;
> > again, feel
> > > >>>>> free to update the wiki page with other known issues that
cannot
> be
> > > >>>>> resolved.
> > > >>>>>
> > > >>>>> 2. If, we want to go with the current scope then my next
question
> > would be,
> > > >>>>> how much breakage we would introducing if we just modify
the
> > Processor
> > > >>>>> signature directly? My feeling is that DSL users would
be most
> > likely not
> > > >>>>> affected and PAPI users only need to modify a few lines
on class
> > > >>>>> declaration. I feel it worth doing some research on this
part and
> > then
> > > >>>>> decide if we really want to bite the bullet of duplicated
> Processor
> > /
> > > >>>>> ProcessorSupplier classes for maintaining compatibility.
> > > >>>>>
> > > >>>>>
> > > >>>>> Guozhang
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On Wed, Jun 19, 2019 at 12:21 PM John Roesler <john@confluent.io
> >
> > wrote:
> > > >>>>>
> > > >>>>>> Hi all,
> > > >>>>>>
> > > >>>>>> In response to the feedback so far, I changed the
package name
> from
> > > >>>>>> `processor2` to `processor.generic`.
> > > >>>>>>
> > > >>>>>> Thanks,
> > > >>>>>> -John
> > > >>>>>>
> > > >>>>>> On Mon, Jun 17, 2019 at 4:49 PM John Roesler <john@confluent.io
> >
> > wrote:
> > > >>>>>>>
> > > >>>>>>> Thanks for the feedback, Sophie!
> > > >>>>>>>
> > > >>>>>>> I actually felt a little uneasy when I wrote that
remark,
> because
> > it's
> > > >>>>>>> not restricted at all in the API, it's just available
to you if
> > you
> > > >>>>>>> choose to give your stores and context the same
parameters.
> So, I
> > > >>>>>>> think your use case is valid, and also perfectly
permissable
> > under the
> > > >>>>>>> current KIP. Sorry for sowing confusion on my
own discussion
> > thread!
> > > >>>>>>>
> > > >>>>>>> I'm not crazy about the package name, either.
I went with it
> only
> > > >>>>>>> because there's seemingly nothing special about
the new package
> > except
> > > >>>>>>> that it can't have the same name as the old one.
Otherwise, the
> > > >>>>>>> existing "processor" and "Processor" names for
the package and
> > class
> > > >>>>>>> are perfectly satisfying. Rather than pile on
additional
> > semantics, it
> > > >>>>>>> seemed cleaner to just add a number to the package
name.
> > > >>>>>>>
> > > >>>>>>> This wouldn't be the first project to do something
like this...
> > Apache
> > > >>>>>>> Commons, for example, has added a "2" to the end
of some of
> their
> > > >>>>>>> packages for exactly the same reason.
> > > >>>>>>>
> > > >>>>>>> I'm open to any suggestions. For example, we could
do something
> > like
> > > >>>>>>> org.apache.kafka.streams.typedprocessor.Processor
or
> > > >>>>>>> org.apache.kafka.streams.processor.typed.Processor
, which
> would
> > have
> > > >>>>>>> just about the same effect. One microscopic thought
is that, if
> > > >>>>>>> there's another interface in the "processor" package
that we
> wish
> > to
> > > >>>>>>> do the same thing to, would _could_ pile it in
to "processor2",
> > but we
> > > >>>>>>> couldn't do the same if we use a package that
has "typed" in
> the
> > name,
> > > >>>>>>> unless that change is _also_ related to types
in some way. But
> > this
> > > >>>>>>> seems like a very minor concern.
> > > >>>>>>>
> > > >>>>>>> What's your preference?
> > > >>>>>>> -John
> > > >>>>>>>
> > > >>>>>>> On Mon, Jun 17, 2019 at 3:56 PM Sophie Blee-Goldman
<
> > sophie@confluent.io>
> > > >>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>> Hey John, thanks for writing this up! I like
the proposal but
> > there's
> > > >>>>>> one
> > > >>>>>>>> point that I think may be too restrictive:
> > > >>>>>>>>
> > > >>>>>>>> "A processor that happens to use a typed store
is actually
> > emitting the
> > > >>>>>>>> same types that it is storing."
> > > >>>>>>>>
> > > >>>>>>>> I can imagine someone could want to leverage
this new type
> safety
> > > >>>>>> without
> > > >>>>>>>> also limiting how they can interact with/use
their store. As
> an
> > > >>>>>> (admittedly
> > > >>>>>>>> contrived) example, say you have an input
stream of purchases
> of
> > a
> > > >>>>>> certain
> > > >>>>>>>> type (entertainment, food, etc), and on seeing
a new record
> you
> > want to
> > > >>>>>>>> output how many types of purchase a shopper
has made more
> than 5
> > > >>>>>> purchases
> > > >>>>>>>> of in the last month. Your state store will
probably be
> holding
> > some
> > > >>>>>> more
> > > >>>>>>>> complicated PurchaseHistory object (keyed
by user), but your
> > output is
> > > >>>>>> just
> > > >>>>>>>> a <User, Long>
> > > >>>>>>>>
> > > >>>>>>>> I'm also not crazy about "processor2" as the
package name ...
> > not sure
> > > >>>>>> what
> > > >>>>>>>> a better one would be though (something with
"typed"?)
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Jun 17, 2019 at 12:47 PM John Roesler
<
> john@confluent.io>
> > > >>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi all,
> > > >>>>>>>>>
> > > >>>>>>>>> I'd like to propose KIP-478 (
> > > >>>>>> https://cwiki.apache.org/confluence/x/2SkLBw
> > > >>>>>>>>> ).
> > > >>>>>>>>>
> > > >>>>>>>>> This proposal would add output type bounds
to the Processor
> > interface
> > > >>>>>>>>> in Kafka Streams, which enables static
checking of a number
> of
> > useful
> > > >>>>>>>>> properties:
> > > >>>>>>>>> * A processor B that consumes the output
of processor A is
> > actually
> > > >>>>>>>>> expecting the same types that processor
A produces.
> > > >>>>>>>>> * A processor that happens to use a typed
store is actually
> > emitting
> > > >>>>>>>>> the same types that it is storing.
> > > >>>>>>>>> * A processor is simply forwarding the
expected types in all
> > code
> > > >>>>>> paths.
> > > >>>>>>>>> * Processors added via the Streams DSL,
which are not
> permitted
> > to
> > > >>>>>>>>> forward results at all are statically
prevented from doing so
> > by the
> > > >>>>>>>>> compiler
> > > >>>>>>>>>
> > > >>>>>>>>> Internally, we can use the above properties
to achieve a much
> > higher
> > > >>>>>>>>> level of confidence in the Streams DSL
implementation's
> > correctness.
> > > >>>>>>>>> Actually, while doing the POC, I found
a few bugs and
> mistakes,
> > which
> > > >>>>>>>>> become structurally impossible with KIP-478.
> > > >>>>>>>>>
> > > >>>>>>>>> Additionally, the stronger types dramatically
improve the
> > > >>>>>>>>> self-documentation of our Streams internal
implementations,
> > which
> > > >>>>>>>>> makes it much easier for new contributors
to ramp up with
> > confidence.
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks so much for your consideration!
> > > >>>>>>>>> -John
> > > >>>>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > >
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message