kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject New Producer Public API
Date Fri, 24 Jan 2014 19:54:13 GMT
As mentioned in a previous email we are working on a re-implementation of
the producer. I would like to use this email thread to discuss the details
of the public API and the configuration. I would love for us to be
incredibly picky about this public api now so it is as good as possible and
we don't need to break it in the future.

The best way to get a feel for the API is actually to take a look at the
javadoc, my hope is to get the api docs good enough so that it is
self-explanatory:
http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html

Please take a look at this API and give me any thoughts you may have!

It may also be reasonable to take a look at the configs:
http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html

The actual code is posted here:
https://issues.apache.org/jira/browse/KAFKA-1227

A few questions or comments to kick things off:
1. We need to make a decision on whether serialization of the user's key
and value should be done by the user (with our api just taking byte[]) or
if we should take an object and allow the user to configure a Serializer
class which we instantiate via reflection. We take the later approach in
the current producer, and I have carried this through to this prototype.
The tradeoff I see is this: taking byte[] is actually simpler, the user can
directly do whatever serialization they like. The complication is actually
partitioning. Currently partitioning is done by a similar plug-in api
(Partitioner) which the user can implement and configure to override how
partitions are assigned. If we take byte[] as input then we have no access
to the original object and partitioning MUST be done on the byte[]. This is
fine for hash partitioning. However for various types of semantic
partitioning (range partitioning, or whatever) you would want access to the
original object. In the current approach a producer who wishes to send
byte[] they have serialized in their own code can configure the
BytesSerialization we supply which is just a "no op" serialization.
2. We should obsess over naming and make sure each of the class names are
good.
3. Jun has already pointed out that we need to include the topic and
partition in the response, which is absolutely right. I haven't done that
yet but that definitely needs to be there.
4. Currently RecordSend.await will throw an exception if the request
failed. The intention here is that producer.send(message).await() exactly
simulates a synchronous call. Guozhang has noted that this is a little
annoying since the user must then catch exceptions. However if we remove
this then if the user doesn't check for errors they won't know one has
occurred, which I predict will be a common mistake.
5. Perhaps there is more we could do to make the async callbacks and future
we give back intuitive and easy to program against?

Some background info on implementation:

At a high level the primary difference in this producer is that it removes
the distinction between the "sync" and "async" producer. Effectively all
requests are sent asynchronously but always return a future response object
that gives the offset as well as any error that may have occurred when the
request is complete. The batching that is done in the async producer only
today is done whenever possible now. This means that the sync producer,
under load, can get performance as good as the async producer (preliminary
results show the producer getting 1m messages/sec). This works similar to
group commit in databases but with respect to the actual network
transmission--any messages that arrive while a send is in progress are
batched together. It is also possible to encourage batching even under low
load to save server resources by introducing a delay on the send to allow
more messages to accumulate; this is done using the linger.ms config (this
is similar to Nagle's algorithm in TCP).

This producer does all network communication asynchronously and in parallel
to all servers so the performance penalty for acks=-1 and waiting on
replication should be much reduced. I haven't done much benchmarking on
this yet, though.

The high level design is described a little here, though this is now a
little out of date:
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite

-Jay

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message