flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Simone Roselli <simone.rose...@plista.com>
Subject Re: Spooldir -> Kafka sink
Date Fri, 20 May 2016 09:48:46 GMT
Hi Jeong,

that is exactly the problem.

The right topic name (eg: X) is present in the event headers. The Kakfa sink is working normally
with my Thrift source.

I have a problem only when I have to use the spoolDir source. SpoolDir is probably not able
to read those headers and instead of sending the event to "X", it tries a default topic first
('default-flume-topic') and then fails, like showed in the trace that I sent.


If I write the topic name in the Flume-ng agent.conf, then, the spoolDir is working as well

eg: agent.sinks.kafka.topic = 'X'

but of course, I cannot go for this configuration, since I have several different topics to
work with.


Thanks a lot.

Simone Roselli
ITE Sysadmin
simone.roselli@plista.com
http://www.plista.com

----- Original Message -----
From: "Jeong-shik Jang" <jsjangg@gmail.com>
To: "user" <user@flume.apache.org>
Sent: Friday, May 20, 2016 11:16:17 AM
Subject: Re: Spooldir -> Kafka sink

Hi Simone,

I got better understanding; thanks for your explanation.
In that case, how about checking key name in headers; it is supposed to be
"topic".

User guide reads:
If the event header contains a “topic” field, the event will be published
to that topic overriding the topic configured here

JS


2016-05-20 18:08 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:

> Hi Jeong,
>
> thanks for your answer.
>
> I already have my topics in Kafka, I don't need to create new topics.
> Unfortunately, the problem is different here.
>
> Problem in one sentence:
>
> ** The Spooldir source is not able to successfully send events to my Kafka
> topic, if the topic name is not set in the agent.conf **
>
>
>
> Simone Roselli
> ITE Sysadmin
> simone.roselli@plista.com
> http://www.plista.com
>
> ----- Original Message -----
> From: "Jeong-shik Jang" <jsjangg@gmail.com>
> To: "user" <user@flume.apache.org>
> Sent: Friday, May 20, 2016 3:00:02 AM
> Subject: Re: Spooldir -> Kafka sink
>
> Hi Simone,
>
> How about starting from checking your Kafka configuration? The related
> property name I think is "auto.create.topics.enable".
>
> auto.create.topics.enable true Enable auto creation of topic on the server.
> If this is set to true then attempts to produce, consume, or fetch metadata
> for a non-existent topic will automatically create it with the default
> replication factor and number of partitions.
>
> Default value is true so likely it is enabled but just to make sure.
>
> JS
>
> 2016-05-19 22:37 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:
>
> > Hallo,
> >
> > I'm using 2 sinks (Kafka, Fileroll) in failover.
> >
> > If the Kafka sink is temporary unreachable, the Fileroll takes over and
> > writes events on a local dir.
> >
> > Then, I configure a spoolDir source, for a directory /dir, pointing to
> the
> > Kafka sink.
> >
> > When I try to move an event from the local dir to the spool dir, the
> event
> > doesn't reach Kafka and I get this:
> >
> > """
> > 9 May 2016 15:12:39,007 WARN
> > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > (kafka.utils.Logging$class.warn:83)  - Error while fetching metadata
> > [{TopicMetadata for topic default-flume-topic ->
> > No partition metadata for topic default-flume-topic due to
> > kafka.common.UnknownTopicOrPartitionException}] for topic
> > [default-flume-topic]: class
> kafka.common.UnknownTopicOrPartitionException
> >
> > 19 May 2016 15:12:39,007 ERROR
> > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > (kafka.utils.Logging$class.error:97)  - Failed to collate messages by
> > topic, partition due to: Failed to fetch topic metadata for topic:
> > default-flume-topic
> >
> > 19 May 2016 15:12:39,007 INFO
> > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
> > class.info:68)  - Back off for 100 ms before retrying send. Remaining
> > retries = 3
> >
> > 19 May 2016 15:12:39,108 INFO
> > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
> > class.info:68)  - Fetching metadata from broker id:1,host:
> > broker01.doamain.com,port:9092 with correlation id 45270 for 2 topic(s)
> > Set(MyTopic, default-flume-topic)
> >
> > ...
> >
> > 19 May 2016 15:12:39,433 ERROR
> > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > (kafka.utils.Logging$class.error:97)  - Failed to send requests for
> topics
> > MyTopic,default-flume-topic with correlation ids in [xxx,xxx]
> >
> > """
> >
> > default-flume-topic = kafka topic used by flume-ng Kafka sink
> > MyTopic = my actual target topic, present in the event headers
> >
> > In the agent.conf i didn't set any topic name as topic names are
> > dynamically assigned. If I define a topic name in the agent.conf, then it
> > works.
> >
> >
> > Any clues?
> > Thanks
> >
> >
> >
> > Simone Roselli
> > ITE Sysadmin
> > simone.roselli@plista.com
> > http://www.plista.com
> >
>

Mime
View raw message