From issues-return-990-archive-asf-public=cust-asf.ponee.io@flume.apache.org Wed Mar 4 12:43:03 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1C40618063F for ; Wed, 4 Mar 2020 13:43:03 +0100 (CET) Received: (qmail 44904 invoked by uid 500); 4 Mar 2020 12:43:02 -0000 Mailing-List: contact issues-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list issues@flume.apache.org Received: (qmail 44894 invoked by uid 99); 4 Mar 2020 12:43:02 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2020 12:43:02 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id B0647E044E for ; Wed, 4 Mar 2020 12:43:01 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 508A3782317 for ; Wed, 4 Mar 2020 12:43:00 +0000 (UTC) Date: Wed, 4 Mar 2020 12:43:00 +0000 (UTC) From: "huaicui (Jira)" To: issues@flume.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Resolved] (FLUME-3358) The kafka channel does not work properly when the interceptor filters into an empty event queue MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLUME-3358?page=3Dcom.atlassia= n.jira.plugin.system.issuetabpanels:all-tabpanel ] huaicui resolved FLUME-3358. ---------------------------- Resolution: Not A Bug > The kafka channel does not work properly when the interceptor filters int= o an empty event queue=20 > -------------------------------------------------------------------------= ----------------------- > > Key: FLUME-3358 > URL: https://issues.apache.org/jira/browse/FLUME-3358 > Project: Flume > Issue Type: Bug > Components: Kafka Channel > Affects Versions: 1.9.0 > Reporter: huaicui > Priority: Trivial > Attachments: exception.jpg > > > I have a requirement to stream and filter kafka topics according to the b= usiness. Because the filter will cause list to be empty, the whole p= ipeline will not work properly > logic like this: > String key =3D JsonPath.read(message, "$.key"); > switch (key) > { case "test1": return process(key, event); case "test2": return process(= key, event); case "test3": return process(key, event); default: return null= ; } > When all data of a queue will be filtered,=C2=A0 this pipeline(kafka->hdf= s) will stay in an abnormal state. > =C2=A0 > This is my configuration: > # device flume > Test.sources =3D r1 > Test.sinks =3D k1 > Test.channels =3D c1 > Test.sources.r1.type =3D org.apache.flume.source.kafka.KafkaSource > Test.sources.r1.kafka.bootstrap.servers =3D xxxx > Test.sources.r1.topic =3D xxx > Test.sources.r1.groupId =3D test_lab_1 > Test.sources.r1.kafka.consumer.timeout.ms =3D 100 > Test.sources.r1.interceptors =3D i1 > Test.sources.r1.interceptors.i1.type =3D com.goe.DeviceUsageDeserializer= Interceptor$Builder. =E2=80=94 This is my custom interceptor > # Describe the sink > Test.sinks.k1.type =3D hdfs > Test.sinks.k1.hdfs.path =3D /user/naming/%\{DeviceDir} > Test.sinks.k1.hdfs.filePrefix =3D device- > Test.sinks.k1.hdfs.fileSuffix =3D .csv > Test.sinks.k1.hdfs.inUseSuffix =3D .tmp > Test.sinks.k1.hdfs.idleTimeout =3D 120 > Test.sinks.k1.hdfs.writeFormat =3D Text > Test.sinks.k1.hdfs.batchSize =3D 100 > Test.sinks.k1.hdfs.threadsPoolSize =3D 10 > Test.sinks.k1.hdfs.rollSize =3D 0 > Test.sinks.k1.hdfs.rollCount =3D 0 > # Use a channel which buffers events in memory > Test.channels.c1.type =3D memory > Test.channels.c1.capacity =3D 10000 > Test.channels.c1.transactionCapacity =3D 1000 > # Bind the source and sink to the channel > Test.sources.r1.channels =3D c1 > Test.sinks.k1.channel =3D c1 > this is exception: > !image-2020-03-04-19-13-16-068.png! -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org For additional commands, e-mail: issues-help@flume.apache.org