kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Error in Kafka Stream
Date Mon, 18 Jun 2018 01:15:30 GMT
Hello Amandeep,

What file system are you using? Also is `/opt/info` a temp folder that can
be auto-cleared from time to time?


Guozhang

On Fri, Jun 15, 2018 at 6:39 AM, Amandeep Singh <amandeep36@gmail.com>
wrote:

> Hi,
>
>
>
>  I am getting the below error while processign data with kafka stream. The
> application was runnign for a couple of hours and the '
> WatchlistUpdate-StreamThread-9 ' thread was assigned to the same partition
> since beginning. I am assuming it was able to successfully commit offsets
> for those couple of hours and the directory '
> /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> UI-Watchlist-ES-App/0_2
> ' did exist for that period.
>
>  And then I start getting the below error after every 30 secs (probably
> because if offset commit interval)  and messages are being missed from
> processing.
>
> Can you please help?
>
>
> 2018-06-15 08:47:58 [WatchlistUpdate-StreamThread-9] WARN
> o.a.k.s.p.i.ProcessorStateManager:246
> - task [0_2] Failed
>
> to write checkpoint file to
> /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> UI-Watchlist-ES-App/0_2/.che
>
> ckpoint:
>
> java.io.FileNotFoundException:
> /opt/info/abc/cache/test-abc-kafka-processor/kafka-streams/
> UI-Watchlist-ES-App/0_2/.
>
> checkpoint.tmp (No such file or directory)
>
>         at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_141]
>
>         at java.io.FileOutputStream.open(FileOutputStream.java:270)
> ~[na:1.8.0_141]
>
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> ~[na:1.8.0_141]
>
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> ~[na:1.8.0_141]
>
>         at
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> OffsetCheckpoint.java:73)
> ~[kafka-streams-
>
> 1.0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> checkpoint(ProcessorStateManager.java:3
>
> 20) ~[kafka-streams-1.0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
> java:306)
> [kafka-streams-1.0.0.ja
>
> r:na]
>
>         at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:2
>
> 08) [kafka-streams-1.0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:299)
> [kafka-streams-1.0.0.j
>
> ar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:289)
> [kafka-streams-1.0.0.j
>
> ar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(
> AssignedTasks.java:87)
> [kafka-streams-1
>
> .0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:451)
> [ka
>
> fka-streams-1.0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(
> AssignedTasks.java:380)
> [kafka-streams-1
>
> .0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(
> TaskManager.java:309)
> [kafka-streams-1.
>
> 0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:1018)
> [kafka-strea
>
> ms-1.0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:835)
> [kafka-streams-1.
>
> 0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:774)
> [kafka-streams-1.
>
> 0.0.jar:na]
>
>         at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:744)
> [kafka-streams-1.0.0.
>
> jar:na]
>
>
> Stream config:
>
> 2018-06-15 08:09:28 [main] INFO  o.a.k.c.consumer.ConsumerConfig:223 -
> ConsumerConfig values:
>
>         auto.commit.interval.ms = 5000
>
>         auto.offset.reset = earliest
>
>         bootstrap.servers = [XYZ]
>
>         check.crcs = true
>
>         client.id = WatchlistUpdate-StreamThread-9-consumer
>
>         connections.max.idle.ms = 540000
>
>         enable.auto.commit = false
>
>         exclude.internal.topics = true
>
>         fetch.max.bytes = 52428800
>
>         fetch.max.wait.ms = 500
>
>         fetch.min.bytes = 1
>
>         group.id = UI-Watchlist-ES-App
>
>         heartbeat.interval.ms = 3000
>
>         interceptor.classes = null
>
>         internal.leave.group.on.close = false
>
>         isolation.level = read_uncommitted
>
>         key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
>         max.partition.fetch.bytes = 1048576
>
>         max.poll.interval.ms = 2147483647
>
>         max.poll.records = 1000
>
>         metadata.max.age.ms = 300000
>
>         metric.reporters = []
>
>         metrics.num.samples = 2
>
>         metrics.recording.level = INFO
>
>         metrics.sample.window.ms = 30000
>
>         partition.assignment.strategy =
> [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
>
>         receive.buffer.bytes = 65536
>
>         reconnect.backoff.max.ms = 1000
>
>         reconnect.backoff.ms = 50
>
>         request.timeout.ms = 305000
>
>         retry.backoff.ms = 100
>
>         sasl.jaas.config = null
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         sasl.mechanism = GSSAPI
>
>         security.protocol = PLAINTEXT
>
>         send.buffer.bytes = 131072
>
>         session.timeout.ms = 10000
>
>         ssl.cipher.suites = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.endpoint.identification.algorithm = null
>
>         ssl.key.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         ssl.keystore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keystore.type = JKS
>
>         ssl.protocol = TLS
>
>         ssl.provider = null
>
>         ssl.secure.random.implementation = null
>
>         ssl.trustmanager.algorithm = PKIX
>
>         ssl.truststore.location = null
>
>         ssl.truststore.password = null
>
>         ssl.truststore.type = JKS
>
>         value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> Regards,
> Amandeep Singh
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message