kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Scott Carey (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-156) Messages should not be dropped when brokers are unavailable
Date Wed, 03 Apr 2013 22:35:14 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13621433#comment-13621433

Scott Carey commented on KAFKA-156:

I am positive that the producer wire protocol has to have built-in features to support the
ability to prevent dropped messages when brokers are unavailable.  There is no way to achieve
'optimal transmission' without two-phase commit or idempotence between the producer and broker.
 I define 'optimal transmission' as the guarantee that data is not duplicated or lost after
some well known point has been reached as viewed by the producer.   Prior to this point (for
example, when the message is in a in memory queue), there can be no guarantees from any system.

"FWIW Clearspring has a pipeline with: ConcurrentQueue --> spill to disk queue with max
size (then drops messages) --> SyncProducer with retry/backoff. "
Such a system can get as close as only losing or duplicating one 'batch' of messages, where
that batch size is >= 1 message.  At best, when reading form the data spilled from disk,
between sending a batch and recieving acknowledgement, a crash at either end will leave that
batch in limbo.   The batch needs an identifier that both sides can persist or generate to
identify the batch in case one side has to recover from a crash.(two phase commit).  Many
database systems have this (see http://www.postgresql.org/docs/9.2/static/sql-prepare-transaction.html),
where as a client you can name a transaction so that after you get an acknowledgement from
the prepare commit, the client can log that it has been prepared, send the commit command,
and if it crashes before getting the acknowledgement, upon recovery it can look up the identifier
for the in flight commit, and check with the system to see if it succeeded or not.

We have an internal system that we are attempting to replace Kafka with, but it does not guarantee
delivery as we do.  We spool data on our producers into batches (a file per batch), and then
transfer these batches into the downstream system.  This system stores these batches in a
staging area, so that if either side crashes before the batch transfer completes recovery
is simple.  Upon validating that the batch (which is uniquely  named) is identical on both
sides, the producer can remove it locally and promote from the staging area to the completed
area (atomically).  This again is safe if either side crashes, since an item in the staging
area that does not exist on the producer indicates it has successfully been moved.

Kafka will have to mimic this sort of safety at each stage.  On the consumer side, batch offsets
+ partition and topic information serve as unique identifiers for a batch that allow only-once
semantics.  On the producer side, is there something equivalent? 

Replication mitigates the problem significantly, but there is still the possibility that an
item is dropped or duplicated if there is a transient network issue that TCP/IP does not handle,
if the broker does not hand out unique batch ids for each batch (I am unsure of this). 

 If messages are spooled to disk when a broker is unavailable, the process of reading back
items from that log and sending them to the broker without loss or duplication is tricky.
 Each batch of messages will need an identifier shared between the broker and producer, and
the batch will need to be marked with the identifier safely to disk prior to sending the batch
to the broker.  After acknowledgement the producer can delete the batch or mark it complete.
 If it crashes between sending the batch to the broker and receiving a response (or otherwise
fails to get acknowledgement) it must be able to ask the broker whether the batch with the
given identifier was received, or alternatively, it can send the batch twice and the broker
will ignore the duplicate send based on the identifier.

Does the producer wire protocol include batch ids generated by the broker so that this can
be implemented?  It does not seem to be the case here https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-ProduceAPI
This protocol does not seem to support the ability to support "only once" message semantics.

> Messages should not be dropped when brokers are unavailable
> -----------------------------------------------------------
>                 Key: KAFKA-156
>                 URL: https://issues.apache.org/jira/browse/KAFKA-156
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Sharad Agarwal
>             Fix For: 0.8
> When none of the broker is available, producer should spool the messages to disk and
keep retrying for brokers to come back.
> This will also enable brokers upgrade/maintenance without message loss.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

View raw message