nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Georgy (JIRA)" <>
Subject [jira] [Commented] (NIFI-5593) ConsumeKafka_0_11 topic name variable is not evaluated correctly
Date Fri, 14 Sep 2018 23:08:00 GMT


Georgy commented on NIFI-5593:

Ok, I see. Thanks for explanation!
I have real-time data flow (syslog) as input and I have to push it every hour, not realtime,
to destination kafka topic. Also it's very important to preserve messages order. That is my
use case and that is a reason why I tried to do it with kafka.
Can you give me advice what technics I can use in NiFi to make it work?

I have two ideas:
a. Create Hive external table
b. Push flowfiles to HDFS to hour-directories
c. Do msck repair table in Hive every new hour and then select and sort messages by time from
partition of previous hour
It is very slow at the step when Hive returns result to NiFI because of big traffic volume
and jdbc connection

a. Push flow to HDFS in Avro to hour-directories
b. Add OS command execution processor in NiFi and submit spark job that will read Avro from
HDFS for previous hour and sort messages by time
Didn't test it yet but don't have any other thoughts.

> ConsumeKafka_0_11 topic name variable is not evaluated correctly
> ----------------------------------------------------------------
>                 Key: NIFI-5593
>                 URL:
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.5.0
>         Environment: RHEL 7.3
> JAVA Oracle 1.8.0_144
> NiFi 1.5.0
> Processor name "ConsumeKafka_0_11"
>            Reporter: Georgy
>            Priority: Major
>         Attachments: proc_prop.PNG, proc_sched.PNG, proc_sett.PNG
> Hi guys,
> Found that "Topic Name" specified as expression "dmp_order_${now() :minus(3600000) :format('HH')
}" is not evaluated correctly in "ConsumeKafka_0_11" processor after next hour begins.
> It is evaluated as expected after processor start/restart only.
> Say, the processor is started at 15:05. Value of "Topic name" variable will be "dmp_order_14".
It's correct. It will consume data from the topic and everything is ok.
> After next hour begins, value of "Topic name" is not evaluated as "dmp_order_15". I see
that consumer still tries to consume data from topic with name "dmp_order_14".
> I tried both Scheduling strategies "Timer driven" and "CRON driven" but have no success.
Seems, I have to restart processor every time I want to re-evalueate "Topic name" variable.
> When I filled CustomText field with now() expression in "GenerateFlowFile" processor
I didn't have such issues. It worked as expected with "CRON driven" strategy.
> Can you say is it a bug or normal behaviour for ConsumeKafka processor? 
> And if it is a normal behaviour can you say is there any opportunity to change "Topic
name" dynamically in running processor?

This message was sent by Atlassian JIRA

View raw message