spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Saisai Shao (JIRA)" <>
Subject [jira] [Commented] (SPARK-4960) Interceptor pattern in receivers
Date Tue, 30 Dec 2014 01:33:13 GMT


Saisai Shao commented on SPARK-4960:

Hi Cody, please see the inline comments.

You're saying for an implementation that currently extends e.g.
that a user can provide a function
T => Iterable\[M\]
But in the case of Kafka, T is currently fixed to (K, V). So for kafka we don't need a user
provided function of type
(K, V) => Iterable\[M\]
We need a user provided function of type
MessageAndMetadata => Iterable\[M\]
In other words, a third type parameter.

I know your requirement in Kafka, the solution mentioned doc is a general solution, if acceptable,
I will change the KafkaReceiver accordingly.

I'm also not clear on how your proposed solution deals with the 9 different overloads of store(),
including the one that takes raw serialized bytes.

At that point I'm not sure that having an interceptor setter defined on a parent class makes
a lot of sense, because it's the particular subclass that knows what its intermediate third
type is (MessageAndMetadata in this case), as well as which store method(s) it cares about.

Sorry I neglect the store() API which takes the raw serialized bytes, I will redesign the
doc to find a better way :).

Thats part of why I think constructor arguments are actually a cleaner way to handle this
- kafka can have an "interceptor" argument that defaults to a function MessageAndMetadata
=> Iterable\[(K,V)\], other implementations can have a type signature for the interceptor
that makes sense for them.

As an aside, I think it should actually be TraversableOnce, not Iterable. All we care about
is being able to call foreach on it once, and the classes that implement TraversableOnce are
a superset of those that implement iterable.

Put "interceptor" in the construct arguments actually a cleaner way, but since all the receiver
related API should add this "interceptor", the change of existing API will be a large work,
also break the binary compatibility. The prerequisite of my design is not to change the current

My design consideration is less modification of current code and generality, though as you
mentioned this design may not so straightforward compared to add "interceptor" in construct
arguments. Thanks a lot for your review and comments, appreciate your time :).

> Interceptor pattern in receivers
> --------------------------------
>                 Key: SPARK-4960
>                 URL:
>             Project: Spark
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Tathagata Das
> Sometimes it is good to intercept a message received through a receiver and modify /
do something with the message before it is stored into Spark. This is often referred to as
the interceptor pattern. There should be general way to specify an interceptor function that
gets applied to all receivers. 

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message