flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhishan Li <zhishan...@gmail.com>
Subject flume interceptor issue
Date Thu, 12 Nov 2015 12:34:16 GMT
I have a question: how to set two interceptors to a channel?

Here is my configuration:

# configuration start
tier1.sources.KafkaSource.channels = c1 c2
tier1.sources.KafkaSource.interceptors = i1 i2
tier1.sources.KafkaSource.interceptors.i1.type = host
tier1.sources.KafkaSource.interceptors.i1.useIP = true

tier1.sources.KafkaSource.interceptors.i2.type = org.apache.flume.interceptor.My$Builder
tier1.sources.KafkaSource.interceptors.i2.key = logType
tier1.sources.KafkaSource.interceptors.i2.value = app

tier1.sources.KafkaSource.selector.type = multiplexing
tier1.sources.KafkaSource.selector.header = logType
tier1.sources.KafkaSource.selector.mapping.app = c1
tier1.sources.KafkaSource.selector.default = c2



The Json format source: KafkSource will be processed by the interceptor: My$Builder, which
will divide the source data to two part - the one with logType=app and another’s logType
!= app.

for example:

{“logType”:”app”,”id”:1,"env”:”test"}
{“logType”:”phone”,”id”:2,}
{“logType”:”phone”,”id”:3,”env”:”test"}

the “id”=1 will be trans to channel c1 and the id=2 and id=3 will be sent to channel c2.

So my question is: what a new interceptor can be set to make the record with logType=app and
env=test be sent to channel c2?

Here is my thought, append the below interceptor:

tier1.sources.KafkaSource.interceptors.i3.type = org.apache.flume.interceptor.Myr$Builder
tier1.sources.KafkaSource.interceptors.i3.key = env
tier1.sources.KafkaSource.interceptors.i3.value = test

But it still make the record with id=2 be sent to channel c2 too.

Please help me figure out.

I expect the interceptor i3 only consume a part of i2’s output with logType=app.

Thanks 

I 
Mime
View raw message