kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerrit Jansen van Vuuren <gerrit...@gmail.com>
Subject Re: cannot replicate topics kafka inconsistent state
Date Tue, 17 Jun 2014 16:16:36 GMT
The network is 10gig and so far has not given any issues I think its
extremely unlikely that it could be network (all ports are open and all
communication happens on an internal lan).

I'm running consumers and producers on the nodes where the brokers are
running and they are consuming and producing data at high volumes between
the nodes.
While doing the test I was not running any producers or consumers other
than the test kafka-console-producer and kafka-console-consumer.




On Tue, Jun 17, 2014 at 4:28 PM, Jun Rao <junrao@gmail.com> wrote:

> Is your network stable?
>
> Thanks,
>
> Jun
>
>
> On Tue, Jun 17, 2014 at 1:48 AM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > Hi,
> >
> > I've installed kafka 2.8.1,
> > created a topic using:
> >
> > /opt/kafka/bin/kafka-topics.sh --create --topic "test" --zookeeper
> > "localhost:2381" --partitions 2 --replication-factor 2
> >
> > Then opened a console producer and a console consumer.
> > I type a few lines on the producer and then the two kafka brokers that
> > should have the two replicas start throwing errors to the logs, the only
> > way to get kafka back to normal again is by deleting all of the topic
> data
> > in kafka and in zookeeper and restarting.
> >
> > The errors are:
> > broker1:
> >
> > 2014-06-17/01:40:32.137/PDT ERROR [kafka-processor-9092-5]:
> > kafka.network.Processor - Closing socket for /10.101.4.218 because of
> > error^C
> >
> > kafka.common.KafkaException: This operation cannot be completed on a
> > complete request.
> >
> > at
> kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
> >
> > at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
> >
> > at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
> >
> > at kafka.network.Processor.write(SocketServer.scala:375)
> >
> > at kafka.network.Processor.run(SocketServer.scala:247)
> >
> > at java.lang.Thread.run(Thread.java:744)
> >
> >
> > broker2
> >
> > 2014-06-17/01:40:29.127/PDT WARN  [ReplicaFetcherThread-0-215]:
> > kafka.consumer.SimpleConsumer - Reconnect due to socket error: null
> >
> > 2014-06-17/01:40:29.127/PDT ERROR [ReplicaFetcherThread-0-215]:
> > kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-215], Error
> in
> > fetch Name: FetchRequest; Version: 0; CorrelationId: 545271; ClientId:
> > ReplicaFetcherThread-0-215; ReplicaId: 218; MaxWait: 1000 ms; MinBytes: 1
> > bytes; RequestInfo: [test,1] -> PartitionFetchInfo(1,2147483647)
> >
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> >
> > at kafka.utils.Utils$.read(Utils.scala:376)
> >
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> > The same error on each is repeated in an endless loop.
> >
> >
> > My config server config is:
> >
> > ----------------------------------------
> >
> > num.network.threads=24
> >
> > num.io.threads=24
> >
> > socket.send.buffer.bytes=10485760
> >
> > socket.receive.buffer.bytes=1048576
> >
> > socket.request.max.bytes=524288000
> >
> > replica.lag.max.messages=5000000
> >
> > replica.fetch.max.bytes=2097152
> >
> > replica.fetch.wait.max.ms=1000
> >
> > log.dir=/data
> >
> > num.partitions=12
> >
> > log.flush.interval.messages=200000
> >
> > log.flush.interval.ms=2000
> >
> > log.retention.hours=168
> >
> > log.retention.mins=10080
> >
> > log.retention.hours=168
> >
> > log.retention.mins=10080
> >
> > log.retention.bytes=2199023255552
> >
> > replica.fetch.max.bytes=2147483647
> >
> > log.segment.bytes=209715200
> >
> > log.cleanup.interval.mins=10
> >
> > default.replication.factor=2
> >
> > zookeeper.connect=localhost:2381
> >
> > zookeeper.connection.timeout.ms=1000000
> >
> > ----------
> >
> >
> > Am I missing some configuration properties?
> >
> >
> > Regards,
> >
> >  Gerrit
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message