flume-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Li Ye (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLUME-2814) flume kafka sink does not write events to configured sink topic when source is also from other topic of kafka
Date Mon, 25 Apr 2016 13:40:13 GMT

     [ https://issues.apache.org/jira/browse/FLUME-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Li Ye updated FLUME-2814:
-------------------------
    Attachment: FLUME-2789-2.patch

> flume kafka sink does not write events to configured sink topic when source is also from
other topic of kafka
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2814
>                 URL: https://issues.apache.org/jira/browse/FLUME-2814
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.5.0
>            Reporter: Manohar
>         Attachments: FLUME-2789-1.patch, FLUME-2789-2.patch
>
>
> I was testing a case when flume agent is reading from kafka source from topic 'sourcetopic'
and sink configured to kafkasink but to other topic 'destinationtopic', 
> tier1.sources  = source1
> tier1.channels = channel1
> tier1.sinks    = sink1
> tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
> tier1.sources.source1.channels = channel1
> tier1.sources.source1.zookeeperConnect = localhost:2181
> tier1.sources.source1.topic = sourcetopic
> tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
> tier1.sinks.sink1.topic = destinationtopic
> tier1.sinks.sink1.brokerList = localhost:9092
> tier1.sinks.sink1.channel = channel1
> tier1.channels.channel1.type = memory
> tier1.channels.channel1.capacity = 10000
> tier1.channels.channel1.transactionCapacity = 1000
> With this settings i noticed that event were not written to 'destinationtopic', 
> After debugging the agent if found that kafka source puts in topic name in header. 
> headers.put(KafkaSourceConstants.TOPIC, topic);
> and in sink check is made to see if headers contain topic, if exists then we take topic
name from header and write event that topic and there by discarding configured sink topic
i ,e destinationtopic.
> here is code snippet that does, even though variable topic as destinationtopic, since
header had topic, kafka sink takes topic name from header and puts event to that topic i,e
again to source topic
>         if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
>           eventTopic = topic;
>         }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message