samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <>
Subject Re: [DISCUSS] KIP-28 - Add a transform client for data processing
Date Wed, 19 Aug 2015 17:00:07 GMT
Hello Yan,

Thanks for the feedback, replies inlined.


On Wed, Aug 19, 2015 at 8:22 AM, Yan Fang <> wrote:

> Hi Guozhang,
> Thank you for writing the KIP-28 up. (Hope this is the right thread for me
> to post some comments. :)
> I still have some confusing about the implementation of the Processor:
> 1. why do we maintain a separate consumer and producer for each worker
> thread?
>     — from my understanding, the new consumer api will be able to fetch
> certain topic-partition. Is one consumer enough for one Kafka.process (it
> is shared among work threads)? The same thing for the producer, is one
> producer enough for sending out messages to the brokers? Will this have
> better performance?

We have considered one-consumer-per-process and one-consumer-per-thread,
where the one-consumer-per-process works as in each iteration, process
first check if it needs to use the consumer to fetch more data, and then
let the threads process their corresponding partition's records, and then
do a barrier synchronization for commit / flush /etc. The main motivation
to select the other apporach is to avoid one thread's waiting for data
block other threads processing: as mentioned in the wiki, we will also try
to block polling on the consumer if its corresponding partition's record
queues are low or empty (otherwise we will just try poll(0)). With
one-consumer-per-process, it means that if one thread is lacking data it
will trigger blocking function calls on the consumer, hence block other
threads to proceed.

> 2. how is the “Stream Synchronization” achieved?
>     — you talked about “pause” and “notify” the consumer. Still not very
> clear. If worker thread has group_1 {topicA-0, topicB-0} and group_2
> {topicA-1, topicB-1}, and topicB is much slower. How can we pause the
> consumer to sync topicA and topicB if there is only one consumer?
The details of "pause / resume" is in In short, consumer send
fetch request with the list of partitions to the broker; by pausing a
partition, the consumer will not include that partition in its fetch
requests until it is "resumed", but still owning the partition.

> 3. how does the partition timestamp monotonically increase?
>     — “When the lowest timestamp corresponding record gets processed by
> the thread, the partition time possibly gets advanced.” How does the “gets
> advanced” work? Do we get another “lowest message timestamp value”? But
> doing this, may not get an “advanced” timestamp.
First, each message has a timestamp that can be extracted from a
user-provided extractor function.

Each partition has a record queue, and the partition timestamp is defined
as the lowest timestamp inside the record queue, BUT it will not go back
once advanced. For example, let say we have the record queue as (message
timestamps below):

head: 5, 5, 4, 2, 3, 1

where new messages are added at head from the consumer, and threads
processes messages from the tail. Right now the partition timestamp is
defined as 1. Let's say we have processed message with timestamp 1, now the
record queue is:

head: 5, 5, 4, 2, 3

And the lowest timestamp is now 2, so we advance the partition timestamp to
2. Say now a new message is put from consumer with timestamp 0:

head: 0, 5, 5, 4, 2, 3

Although its timestamp is lower than 2, since we have advanced the
timestamp it will not go back to 0.

With that, the stream timestamp is defined as the lowest timestamp of all
its assigned partitions in a process.

4. thoughts about the local state management.
>     — from the description, I think there is one kv store per
> partition-group. That means if one work thread is assigned more than one
> partition groups, it will have more than one kv-store connections. How can
> we avoid mis-operation? Because one partition group can easily write to
> another partition group’s kv store (they are in the same thread).
Each partition-group is effectively a "task" which has its own topology and
state stores, and is assigned with an Id. The local kv-store is associated
with the partition-group id and hence a partition-group will not be writing
to another group's kv-store.

> 5. do we plan to implement the throttle ?
>     — since we are “forwarding” the messages. It is very possible that,
> upstream-processor is much faster than the downstream-processor, how do we
> plan to deal with this?
Not sure I understand this question, but just trying to reply with my
understanding here:

An operator is connected either to another operator "in-memory", meaning
that they are not through intermediate Kafka logs, or is piping its output
to Kafka which is going to be consumed by another operator. In the first
case there should be no "back-pressure" as there is no internal queues at
all between these operators; in the latter case Kafka should be able to
handle back-pressure for us already.

> 6. how does the parallelism work?
>     — we achieve this by simply adding more threads? Or we plan to have
> the mechanism which can deploy different threads to different machines? It
> is easy to image that we can deploy different processors to different
> machines, then how about the work threads? Then how is the fault-tolerance?
> Maybe this is out-of-scope of the KIP?
The KStream is designed to be light-weight such that it will not have any
deployment mechanism: user just write their main method that specify the
topology and possibly #.threads as well in configs, and then call run().
They can of course starting multiple such processes as well, either
directly through scripts or through YARN slider / Mesos Marathon / etc.

> Two nits in the KIP-28 doc:
> 1. miss the “close” method interfaceProcessor<K1,V1,K2,V2>. We have the
> “override close()” in KafkaProcessor.
> 2. “punctuate” does not accept “parameter”, while StatefulProcessJob has a
> punctuate method that accepts parameter.
Cool, will fix them.

> Thanks,
> Yan

-- Guozhang

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message