flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeong-shik Jang <jsja...@gmail.com>
Subject Re: Spooldir -> Kafka sink
Date Fri, 20 May 2016 01:00:02 GMT
Hi Simone,

How about starting from checking your Kafka configuration? The related
property name I think is "auto.create.topics.enable".

auto.create.topics.enable true Enable auto creation of topic on the server.
If this is set to true then attempts to produce, consume, or fetch metadata
for a non-existent topic will automatically create it with the default
replication factor and number of partitions.

Default value is true so likely it is enabled but just to make sure.

JS

2016-05-19 22:37 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:

> Hallo,
>
> I'm using 2 sinks (Kafka, Fileroll) in failover.
>
> If the Kafka sink is temporary unreachable, the Fileroll takes over and
> writes events on a local dir.
>
> Then, I configure a spoolDir source, for a directory /dir, pointing to the
> Kafka sink.
>
> When I try to move an event from the local dir to the spool dir, the event
> doesn't reach Kafka and I get this:
>
> """
> 9 May 2016 15:12:39,007 WARN
> [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (kafka.utils.Logging$class.warn:83)  - Error while fetching metadata
> [{TopicMetadata for topic default-flume-topic ->
> No partition metadata for topic default-flume-topic due to
> kafka.common.UnknownTopicOrPartitionException}] for topic
> [default-flume-topic]: class kafka.common.UnknownTopicOrPartitionException
>
> 19 May 2016 15:12:39,007 ERROR
> [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (kafka.utils.Logging$class.error:97)  - Failed to collate messages by
> topic, partition due to: Failed to fetch topic metadata for topic:
> default-flume-topic
>
> 19 May 2016 15:12:39,007 INFO
> [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
> class.info:68)  - Back off for 100 ms before retrying send. Remaining
> retries = 3
>
> 19 May 2016 15:12:39,108 INFO
> [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
> class.info:68)  - Fetching metadata from broker id:1,host:
> broker01.doamain.com,port:9092 with correlation id 45270 for 2 topic(s)
> Set(MyTopic, default-flume-topic)
>
> ...
>
> 19 May 2016 15:12:39,433 ERROR
> [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (kafka.utils.Logging$class.error:97)  - Failed to send requests for topics
> MyTopic,default-flume-topic with correlation ids in [xxx,xxx]
>
> """
>
> default-flume-topic = kafka topic used by flume-ng Kafka sink
> MyTopic = my actual target topic, present in the event headers
>
> In the agent.conf i didn't set any topic name as topic names are
> dynamically assigned. If I define a topic name in the agent.conf, then it
> works.
>
>
> Any clues?
> Thanks
>
>
>
> Simone Roselli
> ITE Sysadmin
> simone.roselli@plista.com
> http://www.plista.com
>

Mime
View raw message