flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeong-shik Jang <jsja...@gmail.com>
Subject Re: Spooldir -> Kafka sink
Date Fri, 20 May 2016 10:07:35 GMT
Hi Simone,

I see your point.

How about using "sink.serializer = avro_event" for "file_roll sink" so that
you can keep full event information including headers rather than body
only, and for spooldir source, to recover event back, using "deserializer =
AVRO"?

I am not sure it will work as I've never tried but just for the case you
think it makes sense.

JS




2016-05-20 18:48 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:

> Hi Jeong,
>
> that is exactly the problem.
>
> The right topic name (eg: X) is present in the event headers. The Kakfa
> sink is working normally with my Thrift source.
>
> I have a problem only when I have to use the spoolDir source. SpoolDir is
> probably not able to read those headers and instead of sending the event to
> "X", it tries a default topic first ('default-flume-topic') and then fails,
> like showed in the trace that I sent.
>
>
> If I write the topic name in the Flume-ng agent.conf, then, the spoolDir
> is working as well
>
> eg: agent.sinks.kafka.topic = 'X'
>
> but of course, I cannot go for this configuration, since I have several
> different topics to work with.
>
>
> Thanks a lot.
>
> Simone Roselli
> ITE Sysadmin
> simone.roselli@plista.com
> http://www.plista.com
>
> ----- Original Message -----
> From: "Jeong-shik Jang" <jsjangg@gmail.com>
> To: "user" <user@flume.apache.org>
> Sent: Friday, May 20, 2016 11:16:17 AM
> Subject: Re: Spooldir -> Kafka sink
>
> Hi Simone,
>
> I got better understanding; thanks for your explanation.
> In that case, how about checking key name in headers; it is supposed to be
> "topic".
>
> User guide reads:
> If the event header contains a “topic” field, the event will be published
> to that topic overriding the topic configured here
>
> JS
>
>
> 2016-05-20 18:08 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:
>
> > Hi Jeong,
> >
> > thanks for your answer.
> >
> > I already have my topics in Kafka, I don't need to create new topics.
> > Unfortunately, the problem is different here.
> >
> > Problem in one sentence:
> >
> > ** The Spooldir source is not able to successfully send events to my
> Kafka
> > topic, if the topic name is not set in the agent.conf **
> >
> >
> >
> > Simone Roselli
> > ITE Sysadmin
> > simone.roselli@plista.com
> > http://www.plista.com
> >
> > ----- Original Message -----
> > From: "Jeong-shik Jang" <jsjangg@gmail.com>
> > To: "user" <user@flume.apache.org>
> > Sent: Friday, May 20, 2016 3:00:02 AM
> > Subject: Re: Spooldir -> Kafka sink
> >
> > Hi Simone,
> >
> > How about starting from checking your Kafka configuration? The related
> > property name I think is "auto.create.topics.enable".
> >
> > auto.create.topics.enable true Enable auto creation of topic on the
> server.
> > If this is set to true then attempts to produce, consume, or fetch
> metadata
> > for a non-existent topic will automatically create it with the default
> > replication factor and number of partitions.
> >
> > Default value is true so likely it is enabled but just to make sure.
> >
> > JS
> >
> > 2016-05-19 22:37 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:
> >
> > > Hallo,
> > >
> > > I'm using 2 sinks (Kafka, Fileroll) in failover.
> > >
> > > If the Kafka sink is temporary unreachable, the Fileroll takes over and
> > > writes events on a local dir.
> > >
> > > Then, I configure a spoolDir source, for a directory /dir, pointing to
> > the
> > > Kafka sink.
> > >
> > > When I try to move an event from the local dir to the spool dir, the
> > event
> > > doesn't reach Kafka and I get this:
> > >
> > > """
> > > 9 May 2016 15:12:39,007 WARN
> > > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > > (kafka.utils.Logging$class.warn:83)  - Error while fetching metadata
> > > [{TopicMetadata for topic default-flume-topic ->
> > > No partition metadata for topic default-flume-topic due to
> > > kafka.common.UnknownTopicOrPartitionException}] for topic
> > > [default-flume-topic]: class
> > kafka.common.UnknownTopicOrPartitionException
> > >
> > > 19 May 2016 15:12:39,007 ERROR
> > > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > > (kafka.utils.Logging$class.error:97)  - Failed to collate messages by
> > > topic, partition due to: Failed to fetch topic metadata for topic:
> > > default-flume-topic
> > >
> > > 19 May 2016 15:12:39,007 INFO
> > > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
> > > class.info:68)  - Back off for 100 ms before retrying send. Remaining
> > > retries = 3
> > >
> > > 19 May 2016 15:12:39,108 INFO
> > > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
> > > class.info:68)  - Fetching metadata from broker id:1,host:
> > > broker01.doamain.com,port:9092 with correlation id 45270 for 2
> topic(s)
> > > Set(MyTopic, default-flume-topic)
> > >
> > > ...
> > >
> > > 19 May 2016 15:12:39,433 ERROR
> > > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > > (kafka.utils.Logging$class.error:97)  - Failed to send requests for
> > topics
> > > MyTopic,default-flume-topic with correlation ids in [xxx,xxx]
> > >
> > > """
> > >
> > > default-flume-topic = kafka topic used by flume-ng Kafka sink
> > > MyTopic = my actual target topic, present in the event headers
> > >
> > > In the agent.conf i didn't set any topic name as topic names are
> > > dynamically assigned. If I define a topic name in the agent.conf, then
> it
> > > works.
> > >
> > >
> > > Any clues?
> > > Thanks
> > >
> > >
> > >
> > > Simone Roselli
> > > ITE Sysadmin
> > > simone.roselli@plista.com
> > > http://www.plista.com
> > >
> >
>

Mime
View raw message