kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David Arthur (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-643) Refactor api definition layer
Date Wed, 09 Jan 2013 18:40:13 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548797#comment-13548797

David Arthur commented on KAFKA-643:

+1 for splitting generic/specific parts of the API (this is basically what I do in my Python

+1 for specifying the protocol in a ~BNF form. This would require protocols to be specified
as LL grammars (which they all are), which is required for efficient ByteBuffer packing/unpacking

However, how would this scheme handle recursive definitions (like MessageSet)? I've always
felt the depth of this should be limited to one, meaning a single Message can contain a compressed
MessageSet which can only be composed of regular (uncompressed) Messages. In https://github.com/mumrah/kafka-python/blob/master/kafka/client.py#L355,
I have to endlessly recurse to ensure I've fully consumed the messages - kind of a pain. If
the depth was limited, I could decode it non-recursively. 

+0 for not using Avro et al. I understand the performance implications of using one of these
frameworks, but it sure does make client development easier. However, as long as the protocol
spec is clear (and correct) implementing a client is not so bad.

What about the Java API? As far as I can tell, the purpose of these classes is to delegate
to the real APIs and handle Java -> Scala data type conversion. It seems like this should
be able to be automatic/automagic. Although, I guess for the implicits stuff to work the Java
classes must be present.

I know it's very new (Scala 10) and experimental, but macros might help in simplifying the
APIs: http://docs.scala-lang.org/overviews/macros/overview.html.
> Refactor api definition layer
> -----------------------------
>                 Key: KAFKA-643
>                 URL: https://issues.apache.org/jira/browse/KAFKA-643
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.8.1
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
> The way we are defining our protocol is really a bit embarrassing. It is full of ad hoc
serialization code for each API. This code is very fiddly and opaque and when it has errors
they are hard to debug. Since it is all done one-off it is also very easy for it to become
inconsistent. This was tolerable when there were only two apis with a few fields each, but
now there are a half dozen more complex apis. By my count there is now over 1000 lines of
code in kafka.apis.*.
> One option would be to use protocol buffers or thrift or another schema-oriented code
gen RPC language. However I think this is probably the wrong direction for a couple reasons.
One is that we want something that works well with our I/O model, both network and disk, which
is very NIO-centric. So it should work directly with ByteBuffers. Second I feel that these
systems complicate the specification of the protocol. They give a schema, which is a great
high-level description, but the translation of that to bytes is essentially whatever their
code-gen engine chooses to do. These things are a great way to build application services,
but not great for something like what we are building.
> Instead I think we should do what we have done, specify the protocol as a wiki. However
we should write a little helper code to make our lives easier.
> Here is my recommendation for how this code would work. We add two helper classes: Schema
and Record.
> You define messages formats like this:
> import Types._
> val FetchRequestProtocol = 
>   Schema("ReplicaId"->int32, 
>                "MaxWaitTime"->int32, 
>                "MinBytes"->int32,
>                Seq("TopicName"->utf8,
>                       Seq("Partition"->int32, 
>                              "FetchOffset"->int64, 
>                              "MaxBytes"->int32)))
> Note that this almost exactly matches the BNF for the fetch request: 
>   https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> Once defined this schema can be used to parse messages:
>   val record: Record = FetchRequestProtocol.readFrom(buffer)
> A record is just a wrapper around an array. The readFrom method parses out the fields
specified in the schema and populates the array. Fields in the record can be accessed by name,
>   record("ReplicaId")
> For common access this is probably good enough. However since the position is fixed,
it is also possible to get the element by a Field object, which gets rid of the hashmap lookup
and goes directly to the right slot. E.g.
>   val ReplicaIdField = FetchRequestProtocol("ReplicaId") // do this as a global variable
>   ...
>   record(ReplicaIdField)
> This will be for cases where we are a bit performance conscious and don't want to do
umpteen hashmap lookups to resolve string field names.
> Likewise the other direction, to write out a record:
>   record.writeTo(buffer)
> and to get the size in bytes:
>   record.size
> Implementing a single read, write, and size method with generic schemas will not only
make the underlying protocol clearly defined but also ensure good error handling, error reporting,
etc. It will be a bit slower, maybe not much because we can optimize this code.
> I do realize that this is essentially what Avro or Thrift or ProtocolBuffers do, but
I think this is much simpler, and can be implemented in a few hundred lines of code with no
dependencies. Furthermore it is a way to implement our protocol, not a way to define a protocol.
> In terms of how we use this, this is what I have in mind:
> I think we should split the apis into a generic and a specific portion. With the generic
piece being the header shared by all requests and responses, and the specific portion being
the bits for that message. I recommend we officially implement versioning by allowing multiple
versions of the schemas and always looking up the right schema for the incoming and outgoing
messages. I think we can keep the existing case classes, and just map the scala objects to
and from the record instances in a wrapper layer prior to the existing KafkaApis. The KafkaApis.handle
method would disappear and instead this wrapper would handle message deserialization and calling
the right method with the right request object.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

View raw message