kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS] KIP-95: Incremental Batch Processing for Kafka Streams
Date Thu, 01 Dec 2016 00:17:27 GMT
Right now, there is only one config value and I am open to better

We did not go for batch.mode=true/false because we might want to have
auto stop at specific stop-offsets or stop-timestamp later on. So we can
extend the parameter with new values like autostop.at=timestamp in
combination with a new parameter that does define the stop timestamp
that gets applied over all input partitions.

Of course, there are other ways to do this extension with different
"where to stop" policies, too. However, only using "batch.mode" as
parameter name right now also has the disadvantage to be less self
descriptive compared to "autostop.at=eol" -- it is not immediately clear
what "batch.mode" means and that is will stop at EOL.

But as I said; I am more than happy to change this to a better name.


On 11/30/16 3:53 PM, Sriram Subramanian wrote:
> I agree that the metadata topic is required to build a batching semantic
> that is intuitive.
> One question on the config -
> autostop.at
> I see one value for it - eol. What other values can be used? Instead, why
> would we not have batch.mode=true/false?
> On Wed, Nov 30, 2016 at 1:51 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>> Both types of intermediate topics are handled the exact same way and
>> both types do connect different subtopologies (even if the user might
>> not be aware that there are multiple subtopologies in case of internal
>> data repartitioning). So there is no distinction between user
>> intermediate topics (via through()) and internal intermediate
>> repartitioning topics.
>> I do also not understand your argument about "coupling instances"? The
>> only "synchronization" is at startup time until the marker is written.
>> Afterwards all instances just run as always. Furthermore, the metadata
>> topic will be written within the leader while computing the overall
>> partition assignment. Thus, the metadata topic will be fully populated
>> (including the marker) before the individual instance will receive their
>> assignment via group management protocol. So there is not more
>> "synchronization" than before, as group management does synchronize
>> instances anyway at startup.
>> About startup failure. Yes, there is the case that the leader could
>> potentially fail before the marker gets written. For this case, we have
>> to consider a few things:
>> 1. the net effect is, that no data will be processed by any instance
>>    (so application can start up, because no partition assignment will be
>> distributed via group management, as the leader did fail while computing
>> the assignment)
>> 2. the failure would occur on partition assignment what would be a
>> severe failure anyway and the application has bigger problems than a
>> missing marker in the meta data topic (nobody will get partitioned
>> assigned as the leader did not finish the assignment computation)
>> 3. if the leader fails, a different application will become the leader.
>>    a) thus, if it is a permanent problem, eventually all instances are
>> going down
>>    b) if the problem is transient, the probability is very high that the
>> new leader will not fail
>> -Matthias
>> On 11/30/16 1:21 PM, Eno Thereska wrote:
>>> In the KIP, two types of intermediate topics are described, 1) ones that
>> connect two sub-topologies, and 2) others that are internal repartitioning
>> topics (e.g., for joins).
>>> I wasn't envisioning stopping the consumption of (2) at the HWM. The HWM
>> can be used for the source topics only (so I agree with your "joins"
>> scenario, but for a different reason).
>>> The case I am worried about is (1) when there are implicit connections
>> between application instances where a 2nd instance's source topics would be
>> the 1st instances output topics. In that case I was suggesting not to
>> couple those instances.
>>> In the (corner) case when the application fails repeatedly, it can still
>> fail right before we write to the metadata topic, so that corner case can
>> still happen. However, it should be extremely rare, and I'd argue if the
>> app is failing repeatedly N times there are bigger problems with the app.
>>> Eno
>>>> On 30 Nov 2016, at 11:52, Damian Guy <damian.guy@gmail.com> wrote:
>>>> I think the KIP looks good. I also think we need the metadata topic
>>>> in-order to provide sane guarantees on what data will be processed.
>>>> As Matthias has outlined in the KIP we need to know when to stop
>> consuming
>>>> from intermediate topics, i.e, topics that are part of the same
>> application
>>>> but are used for re-partitioning or through etc. Without the metadata
>> topic
>>>> the consumption from the intermediate topics would always be one run
>>>> behind. In the case of a join requiring partitioning this would result
>> in
>>>> no output for the first run and then in subsequent runs you'd get the
>>>> output from the previous run - i'd find this a bit odd.
>>>> Also I think having a fixed HWM IMO is a good idea. If you are running
>> your
>>>> streams app in some shared environment, then you don't want to get into
>> a
>>>> situation where the app fails (for random reasons), restarts with a new
>>>> HMW, fails, restarts... and then continues to consume resources for
>> ever as
>>>> the HMW is constantly moving forward. So i think the approach in the KIP
>>>> helps batch-mode streams apps to be good-citizens when running in shared
>>>> environments.
>>>> Thanks,
>>>> Damian
>>>> On Wed, 30 Nov 2016 at 10:40 Eno Thereska <eno.thereska@gmail.com>
>> wrote:
>>>>> Hi Matthias,
>>>>> I like the first part of the KIP. However, the second part with the
>>>>> failure modes and metadata topic is quite complex and I'm worried it
>>>>> doesn't solve the problems you mention under failure. For example, the
>>>>> application can fail before writing to the metadata topic. In that
>> case, it
>>>>> is not clear what the second app instance should do (for the handling
>> of
>>>>> intermediate topics case). So in general, we have the problem of
>> failures
>>>>> during writes to the metadata topic itself.
>>>>> Also, for the intermediate topics example, I feel like we are trying
>>>>> provide some sort of synchronisation between app instances with this
>>>>> approach. By default today such synchronisation does not exist. One
>>>>> instances writes to the intermediate topic, and the other reads from
>> it,
>>>>> but only eventually. That is a nice way to decouple instances in my
>> opinion.
>>>>> The user can always run the batch processing multiple times and
>> eventually
>>>>> all instances will produce some output. The user's app can check
>> whether
>>>>> the output size is satisfactory and then not run any further loops. So
>> I
>>>>> feel they can already get a lot with the simpler first part of the KIP.
>>>>> Thanks
>>>>> Eno
>>>>>> On 30 Nov 2016, at 05:45, Matthias J. Sax <matthias@confluent.io>
>> wrote:
>>>>>> Thanks for your input.
>>>>>> To clarify: the main reason to add the metadata topic is to cope
>>>>>> subtopologies that are connected via intermediate topic (either
>>>>>> user-defined via through() or internally created for data
>>>>> repartitioning).
>>>>>> Without this handling, the behavior would be odd and user experience
>>>>>> would be bad.
>>>>>> Thus, using the metadata topic for have a "fixed HW" is just a small
>>>>>> add-on -- and more or less for free, because the metadata topic is
>>>>>> already there.
>>>>>> -Matthias
>>>>>> On 11/29/16 7:53 PM, Neha Narkhede wrote:
>>>>>>> Thanks for initiating this. I think this is a good first step
>>>>>>> unifying batch and stream processing in Kafka.
>>>>>>> I understood this capability to be simple yet very useful; it
>> a
>>>>>>> Streams program to process a log, in batch, in arbitrary windows
>>>>> defined by
>>>>>>> the difference between the HW and the current offset. Basically,
>>>>>>> provides a simple means for a Streams program to "stop" after
>>>>> processing a
>>>>>>> batch, stop (just like a batch program would) and continue where
>> left
>>>>>>> off when restarted. In other words, it allows batch processing
>> behavior
>>>>> for
>>>>>>> a Streams app without code changes.
>>>>>>> This feature is useful but I do not think there is a necessity
>> add a
>>>>>>> metadata topic. After all, the user doesn't really care as much
>>>>>>> exactly where the batch ends. This feature allows an app to "process
>> as
>>>>>>> much as there is data to process" and the way it determines how
>>>>> data
>>>>>>> there is to process is by reading the HW on startup. If there
is new
>>>>> data
>>>>>>> written to the log right after it starts up, it will process
it when
>>>>>>> restarted the next time. If it starts, reads HW but fails, it
>>>>> restart
>>>>>>> and process a little more before it stops again. The fact that
the HW
>>>>>>> changes in some scenarios isn't an issue since a batch program
>>>>> behaves
>>>>>>> this way doesn't really care exactly what that HW is.
>>>>>>> There might be cases which require adding more topics but I would
>>>>> away
>>>>>>> from adding complexity wherever possible as it complicates operations
>>>>> and
>>>>>>> reduces simplicity.
>>>>>>> Other than this issue, I'm +1 on adding this feature. I think
it is
>>>>> pretty
>>>>>>> powerful.
>>>>>>> On Mon, Nov 28, 2016 at 10:48 AM Matthias J. Sax <
>> matthias@confluent.io
>>>>>>> wrote:
>>>>>>>> Hi all,
>>>>>>>> I want to start a discussion about KIP-95:
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>>>>>>>> Looking forward to your feedback.
>>>>>>>> -Matthias
>>>>>>>> --
>>>>>>> Thanks,
>>>>>>> Neha

View raw message