camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sergey Zhemzhitsky <szh.s...@gmail.com>
Subject Re: Kafka Producer Performance
Date Tue, 02 Aug 2016 10:37:03 GMT
Hi Daniel,

Thanks a lot for the suggestion! I'll give it a try.

Now if with batch consumers everything is pretty clear, with event-driven
consumers - it is not.
For example, amqp and jms implementations can prefetch a number of
messages, but cannot batch those ones out of the box.
So to make camel-kafka producer send messages, obtained from event-driven
consumers, in batches it's necessary to prepare batches manually (i.e. by
means of aggregator eip), although downstream components (camel-kafka)
support batching.



On Mon, Aug 1, 2016 at 3:22 PM, Daniel Kulp <dkulp@apache.org> wrote:

> For one of my clients, I ended up not using the splitter in Camel and
> instead us a custom processor that would create an Iterator<byte[]>.  This
> will work with updates to camel-kafka that are included in 2.17.3.   For my
> tests, using the camel splitter like you have would get about 5K-10K
> msg/sec. With this, I get about 200K.   However, within camel, it stays as
> a single message so anything in the camel route that needs to look at each
> line wouldn’t really work.
>
>
>
> from(“file://…….")
>     .process(new Processor() {
>         public void process(Exchange exchange) throws Exception {
>              InputStream ins = exchange.getIn().getBody(InputStream.class
> );
>                   exchange.getIn().setBody(new SplitterIterator(ins));
>              }
>         })
>         .to("kafka:brokerAddr?topic=messages"
>              + "&serializerClass=
> org.apache.kafka.common.serialization.ByteArraySerializer"
>              + "&keySerializerClass=
> org.apache.kafka.common.serialization.ByteArraySerializer"
>      );
>
>     class SplitterIterator implements Iterator<byte[]> {
>         final InputStream stream;
>         byte[] next;
>         SplitterIterator(InputStream i) {
>             stream = i;
>             next = readNext();
>         }
>         private byte[] readNext() {
>             ByteArrayOutputStream bout = new ByteArrayOutputStream();
>             try {
>                 int v = stream.read();
>                 while (v != -1 && v != '\n') {
>                     bout.write(v);
>                     v = stream.read();
>                 }
>                 if (bout.size() == 0) {
>                     return null;
>                 }
>                 return bout.toByteArray();
>             } catch (IOException e) {
>                 throw new RuntimeException(e);
>             }
>         }
>
>
>         public boolean hasNext() {
>             return next != null;
>         }
>         public byte[] next() {
>             byte[] tmp = next;
>             next = readNext();
>             return tmp;
>         }
>         @Override
>         public void remove() {
>         }
>     };
>
>
> On Aug 1, 2016, at 4:38 AM, Sergey Zhemzhitsky <szh.subs@gmail.com> wrote:
>
> Hi Camel Gurus,
>
> I've faced with some performance issues of camel-kafka component during
> migrating it from 2.17.0 then to 2.17.1 and then to 2.17.2.
>
> The camel route is pretty simple and looks like this
>
> from("file:/var/lib/app/input")
>    .split().simple("\n").streaming()
>        .to("direct:kafka");
> from("direct:kafka")
>    .to("kafka:brokerAddr?topic=messages");
>
> The first issue with camel 2.17.0 was the possibility of losing messages
> <
> https://github.com/apache/camel/blob/camel-2.17.0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L101
> >.
> Kafka's native producer is buffering the messages
> <
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L468
> >
> and if kafka broker is unavailable then the messages can be lost when the
> route is restarted. Although the messages can be lost, the performance was
> pretty good (~10K rps) due to kafka's producer buffering.
>
> The second issue with camel 2.17.1 was that the performance of kafka
> producer degraded tremendously (up to 100 times) because of blocking on
> every message
> <
> https://github.com/apache/camel/blob/camel-2.17.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L100
> >
> (although in that case no message losing occurs).
>
> The third issue with camel 2.17.2 (although camel started using async
> callbacks
> <
> https://github.com/apache/camel/blob/camel-2.17.2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L180
> >)
> was that the performance was still pretty poor because kafka's native
> producer was not able to buffer more than a single message (because of
> synchronous direct endpoint).
>
> The two solutions for the mentioned issues I was able to figure out:
>
> - using seda endpoint instead of direct one (then kafka's native producer
> is able to buffer the messages, but there is still a possibility to lose
> messages (because of nature of seda))
>
> - using aggregator with direct endpoint (then the route becomes more
> complicated than it is expected to be, aggregator adds additional not
> necessary delays and why at all we need additional aggregator for batching
> if the kafka's native producer already does buffering/batching?)
>
> So the question is - is there any possibility to allow kafka's native
> producer buffer more than a single message not using aggregator eip and not
> lose the messages as it can happen with intermediate seda endpoint?
>
> Kind Regards,
> Sergey
>
>
> --
> Daniel Kulp
> dkulp@apache.org - http://dankulp.com/blog
> Talend Community Coder - http://coders.talend.com
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message