flume-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "huaicui (Jira)" <j...@apache.org>
Subject [jira] [Updated] (FLUME-3358) The kafka channel does not work properly when the interceptor filters into an empty event queue
Date Wed, 04 Mar 2020 11:17:00 GMT

     [ https://issues.apache.org/jira/browse/FLUME-3358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

huaicui updated FLUME-3358:
---------------------------
    Description: 
I have a requirement to stream and filter kafka topics according to the business. Because
the filter will cause list<Event> to be empty, the whole pipeline will not work properly

logic like this:

String key = JsonPath.read(message, "$.key");
 switch (key)

{ case "test1": return process(key, event); case "test2": return process(key, event); case
"test3": return process(key, event); default: return null; }

When all data of a queue will be filtered,  this pipeline(kafka->hdfs) will stay in an
abnormal state.

 

This is my configuration:
 # device flume
 Test.sources = r1
 Test.sinks = k1
 Test.channels = c1

Test.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
 Test.sources.r1.kafka.bootstrap.servers = xxxx
 Test.sources.r1.topic = xxx
 Test.sources.r1.groupId = test_lab_1
 Test.sources.r1.kafka.consumer.timeout.ms = 100
 Test.sources.r1.interceptors = i1
 Test.sources.r1.interceptors.i1.type = com.goe.DeviceUsageDeserializerInterceptor$Builder.
— This is my custom interceptor
 # Describe the sink
 Test.sinks.k1.type = hdfs
 Test.sinks.k1.hdfs.path = /user/naming/%\{DeviceDir}
 Test.sinks.k1.hdfs.filePrefix = device-
 Test.sinks.k1.hdfs.fileSuffix = .csv
 Test.sinks.k1.hdfs.inUseSuffix = .tmp
 Test.sinks.k1.hdfs.idleTimeout = 120
 Test.sinks.k1.hdfs.writeFormat = Text
 Test.sinks.k1.hdfs.batchSize = 100
 Test.sinks.k1.hdfs.threadsPoolSize = 10
 Test.sinks.k1.hdfs.rollSize = 0
 Test.sinks.k1.hdfs.rollCount = 0

 # Use a channel which buffers events in memory
 Test.channels.c1.type = memory
 Test.channels.c1.capacity = 10000
 Test.channels.c1.transactionCapacity = 1000

 # Bind the source and sink to the channel
 Test.sources.r1.channels = c1
 Test.sinks.k1.channel = c1

this is exception:

!image-2020-03-04-19-13-16-068.png!

  was:
I have a requirement to stream and filter kafka topics according to the business. Because
the filter will cause list<Event> to be empty, the whole pipeline will not work properly

logic like this:

String key = JsonPath.read(message, "$.key");
switch (key) {
case "test1":
return process(key, event);
case "test2":
return process(key, event);
case "test3":
return process(key, event);
default:
return null;
}

When all data of a queue will be filtered,  this pipeline(kafka->hdfs) will stay in an
abnormal state.

 

This is my configuration:

# device flume
Test.sources = r1
Test.sinks = k1
Test.channels = c1

Test.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
Test.sources.r1.kafka.bootstrap.servers = xxxx
Test.sources.r1.topic = xxx
Test.sources.r1.groupId = test_lab_1
Test.sources.r1.kafka.consumer.timeout.ms = 100
Test.sources.r1.interceptors = i1
Test.sources.r1.interceptors.i1.type = com.cisco.wap.DeviceUsageDeserializerInterceptor$Builder.
--- This is my custom interceptor

# Describe the sink
Test.sinks.k1.type = hdfs
Test.sinks.k1.hdfs.path = /user/naming/%\{DeviceDir}
Test.sinks.k1.hdfs.filePrefix = device-
Test.sinks.k1.hdfs.fileSuffix = .csv
Test.sinks.k1.hdfs.inUseSuffix = .tmp
Test.sinks.k1.hdfs.idleTimeout = 120
Test.sinks.k1.hdfs.writeFormat = Text
Test.sinks.k1.hdfs.batchSize = 100
Test.sinks.k1.hdfs.threadsPoolSize = 10
Test.sinks.k1.hdfs.rollSize = 0
Test.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
Test.channels.c1.type = memory
Test.channels.c1.capacity = 10000
Test.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
Test.sources.r1.channels = c1
Test.sinks.k1.channel = c1

this is exception:

!image-2020-03-04-19-13-16-068.png!


> The kafka channel does not work properly when the interceptor filters into an empty event
queue 
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-3358
>                 URL: https://issues.apache.org/jira/browse/FLUME-3358
>             Project: Flume
>          Issue Type: Bug
>          Components: Kafka Channel
>    Affects Versions: 1.9.0
>            Reporter: huaicui
>            Priority: Blocker
>         Attachments: exception.jpg
>
>
> I have a requirement to stream and filter kafka topics according to the business. Because
the filter will cause list<Event> to be empty, the whole pipeline will not work properly
> logic like this:
> String key = JsonPath.read(message, "$.key");
>  switch (key)
> { case "test1": return process(key, event); case "test2": return process(key, event);
case "test3": return process(key, event); default: return null; }
> When all data of a queue will be filtered,  this pipeline(kafka->hdfs) will stay
in an abnormal state.
>  
> This is my configuration:
>  # device flume
>  Test.sources = r1
>  Test.sinks = k1
>  Test.channels = c1
> Test.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
>  Test.sources.r1.kafka.bootstrap.servers = xxxx
>  Test.sources.r1.topic = xxx
>  Test.sources.r1.groupId = test_lab_1
>  Test.sources.r1.kafka.consumer.timeout.ms = 100
>  Test.sources.r1.interceptors = i1
>  Test.sources.r1.interceptors.i1.type = com.goe.DeviceUsageDeserializerInterceptor$Builder.
— This is my custom interceptor
>  # Describe the sink
>  Test.sinks.k1.type = hdfs
>  Test.sinks.k1.hdfs.path = /user/naming/%\{DeviceDir}
>  Test.sinks.k1.hdfs.filePrefix = device-
>  Test.sinks.k1.hdfs.fileSuffix = .csv
>  Test.sinks.k1.hdfs.inUseSuffix = .tmp
>  Test.sinks.k1.hdfs.idleTimeout = 120
>  Test.sinks.k1.hdfs.writeFormat = Text
>  Test.sinks.k1.hdfs.batchSize = 100
>  Test.sinks.k1.hdfs.threadsPoolSize = 10
>  Test.sinks.k1.hdfs.rollSize = 0
>  Test.sinks.k1.hdfs.rollCount = 0
>  # Use a channel which buffers events in memory
>  Test.channels.c1.type = memory
>  Test.channels.c1.capacity = 10000
>  Test.channels.c1.transactionCapacity = 1000
>  # Bind the source and sink to the channel
>  Test.sources.r1.channels = c1
>  Test.sinks.k1.channel = c1
> this is exception:
> !image-2020-03-04-19-13-16-068.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org


Mime
View raw message