flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeong-shik Jang <jsja...@gmail.com>
Subject Re: Spooldir -> Kafka sink
Date Fri, 20 May 2016 13:45:13 GMT
Hi Simone,

I see.

I just ran a quick test and checked related source codes from my curiosity.
And, you are right, what AvroEventDeserializer is doing is just to
construct a Flume event with avro schema info. in headers and payload in
body, which could be converted to the original Flume event object maybe
with some additional work, I think; so anyway I found no "topic" in headers
but just "flume.avro.schema.{type}".

My understanding is that you might need to implement your own deserializer
to convert the Flume event with avro schema and payload from
AvroEventDeserializer, one step further, to the Java object of original
Flume event; or your own simple serializer(like
HeaderAndBodyTextEventSerializer) for file_roll and deserializer for
spooldir pairs fitting to your event data model.

Sorry for not being helpful.

JS


2016-05-20 19:26 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:

> Hi Jeong,
>
> unfortunately the configuration you propose, is already there:
>
> # Fileroll
> agent.sinks.fileroll.sink.serializer = avro_event
>
> # SpoolDir
> agent.sources.spool.deserializer = AVRO
>
> As far as I remember, when I went for this serializ method, I didn't have
> so many choices.
>
> However, I suspect too that the problem is in the serialization process.
> Not sure. In that case, to me this will be completely blocking.
>
>
> Many thanks
>
>
> Simone Roselli
> Devops Engineer
> simone.roselli@plista.com
> http://www.plista.com
> http://tech.plista.com
>
> ----- Original Message -----
> From: "Jeong-shik Jang" <jsjangg@gmail.com>
> To: "user" <user@flume.apache.org>
> Sent: Friday, May 20, 2016 12:07:35 PM
> Subject: Re: Spooldir -> Kafka sink
>
> Hi Simone,
>
> I see your point.
>
> How about using "sink.serializer = avro_event" for "file_roll sink" so that
> you can keep full event information including headers rather than body
> only, and for spooldir source, to recover event back, using "deserializer =
> AVRO"?
>
> I am not sure it will work as I've never tried but just for the case you
> think it makes sense.
>
> JS
>
>
>
>
> 2016-05-20 18:48 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:
>
> > Hi Jeong,
> >
> > that is exactly the problem.
> >
> > The right topic name (eg: X) is present in the event headers. The Kakfa
> > sink is working normally with my Thrift source.
> >
> > I have a problem only when I have to use the spoolDir source. SpoolDir is
> > probably not able to read those headers and instead of sending the event
> to
> > "X", it tries a default topic first ('default-flume-topic') and then
> fails,
> > like showed in the trace that I sent.
> >
> >
> > If I write the topic name in the Flume-ng agent.conf, then, the spoolDir
> > is working as well
> >
> > eg: agent.sinks.kafka.topic = 'X'
> >
> > but of course, I cannot go for this configuration, since I have several
> > different topics to work with.
> >
> >
> > Thanks a lot.
> >
> > Simone Roselli
> > ITE Sysadmin
> > simone.roselli@plista.com
> > http://www.plista.com
> >
> > ----- Original Message -----
> > From: "Jeong-shik Jang" <jsjangg@gmail.com>
> > To: "user" <user@flume.apache.org>
> > Sent: Friday, May 20, 2016 11:16:17 AM
> > Subject: Re: Spooldir -> Kafka sink
> >
> > Hi Simone,
> >
> > I got better understanding; thanks for your explanation.
> > In that case, how about checking key name in headers; it is supposed to
> be
> > "topic".
> >
> > User guide reads:
> > If the event header contains a “topic” field, the event will be published
> > to that topic overriding the topic configured here
> >
> > JS
> >
> >
> > 2016-05-20 18:08 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:
> >
> > > Hi Jeong,
> > >
> > > thanks for your answer.
> > >
> > > I already have my topics in Kafka, I don't need to create new topics.
> > > Unfortunately, the problem is different here.
> > >
> > > Problem in one sentence:
> > >
> > > ** The Spooldir source is not able to successfully send events to my
> > Kafka
> > > topic, if the topic name is not set in the agent.conf **
> > >
> > >
> > >
> > > Simone Roselli
> > > ITE Sysadmin
> > > simone.roselli@plista.com
> > > http://www.plista.com
> > >
> > > ----- Original Message -----
> > > From: "Jeong-shik Jang" <jsjangg@gmail.com>
> > > To: "user" <user@flume.apache.org>
> > > Sent: Friday, May 20, 2016 3:00:02 AM
> > > Subject: Re: Spooldir -> Kafka sink
> > >
> > > Hi Simone,
> > >
> > > How about starting from checking your Kafka configuration? The related
> > > property name I think is "auto.create.topics.enable".
> > >
> > > auto.create.topics.enable true Enable auto creation of topic on the
> > server.
> > > If this is set to true then attempts to produce, consume, or fetch
> > metadata
> > > for a non-existent topic will automatically create it with the default
> > > replication factor and number of partitions.
> > >
> > > Default value is true so likely it is enabled but just to make sure.
> > >
> > > JS
> > >
> > > 2016-05-19 22:37 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:
> > >
> > > > Hallo,
> > > >
> > > > I'm using 2 sinks (Kafka, Fileroll) in failover.
> > > >
> > > > If the Kafka sink is temporary unreachable, the Fileroll takes over
> and
> > > > writes events on a local dir.
> > > >
> > > > Then, I configure a spoolDir source, for a directory /dir, pointing
> to
> > > the
> > > > Kafka sink.
> > > >
> > > > When I try to move an event from the local dir to the spool dir, the
> > > event
> > > > doesn't reach Kafka and I get this:
> > > >
> > > > """
> > > > 9 May 2016 15:12:39,007 WARN
> > > > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > > > (kafka.utils.Logging$class.warn:83)  - Error while fetching metadata
> > > > [{TopicMetadata for topic default-flume-topic ->
> > > > No partition metadata for topic default-flume-topic due to
> > > > kafka.common.UnknownTopicOrPartitionException}] for topic
> > > > [default-flume-topic]: class
> > > kafka.common.UnknownTopicOrPartitionException
> > > >
> > > > 19 May 2016 15:12:39,007 ERROR
> > > > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > > > (kafka.utils.Logging$class.error:97)  - Failed to collate messages by
> > > > topic, partition due to: Failed to fetch topic metadata for topic:
> > > > default-flume-topic
> > > >
> > > > 19 May 2016 15:12:39,007 INFO
> > > > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (kafka.utils.Logging$
> > > > class.info:68)  - Back off for 100 ms before retrying send.
> Remaining
> > > > retries = 3
> > > >
> > > > 19 May 2016 15:12:39,108 INFO
> > > > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (kafka.utils.Logging$
> > > > class.info:68)  - Fetching metadata from broker id:1,host:
> > > > broker01.doamain.com,port:9092 with correlation id 45270 for 2
> > topic(s)
> > > > Set(MyTopic, default-flume-topic)
> > > >
> > > > ...
> > > >
> > > > 19 May 2016 15:12:39,433 ERROR
> > > > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > > > (kafka.utils.Logging$class.error:97)  - Failed to send requests for
> > > topics
> > > > MyTopic,default-flume-topic with correlation ids in [xxx,xxx]
> > > >
> > > > """
> > > >
> > > > default-flume-topic = kafka topic used by flume-ng Kafka sink
> > > > MyTopic = my actual target topic, present in the event headers
> > > >
> > > > In the agent.conf i didn't set any topic name as topic names are
> > > > dynamically assigned. If I define a topic name in the agent.conf,
> then
> > it
> > > > works.
> > > >
> > > >
> > > > Any clues?
> > > > Thanks
> > > >
> > > >
> > > >
> > > > Simone Roselli
> > > > ITE Sysadmin
> > > > simone.roselli@plista.com
> > > > http://www.plista.com
> > > >
> > >
> >
>

Mime
View raw message