nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 尹文才 <batman...@gmail.com>
Subject Re: proper way in nifi to sync status between custom processors
Date Wed, 27 Dec 2017 04:19:34 GMT
Hi Koji, I will print the sql before actually executing it, but I checked
the error log line you mentioned in your reply, this error was thrown by
NiFi from within another processor called WaitBatch.
I didn't find similar errors as the one from the ExecuteSqlCommand
processor, I think it's because only the ExecuteSqlCommand is used to
create temp database tables.
You could check my ExecuteSqlCommand code via the link:
https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P

If the error is really caused by FlowFile repository checkpoint failure and
the flowfile was executed twice, I may have to create the temp table only
if doesn't exist, I didn't fix this bug in this way
right away is because I was afraid this fix could cover some other problems.

Thanks.

Regards,
Ben

2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokarumawak@gmail.com>:

> Hi Ben,
>
> The following two log messages are very close in terms of written
> timestamp, but have different log level.
> 2017-12-26 07:00:01,312 INFO
> 2017-12-26 07:00:01,315 ERROR
>
> I guess those are logged within a single onTrigger of your
> ExecuteSqlCommand custom processor, one is before executing, the other
> is when it caught an exception. Just guessing as I don't have access
> to the code.
>
> Does the same issue happen with other processors bundled with Apache
> NiFi without your custom processor running?
>
> If NiFi fails to update/checkpoint FlowFile repository, then the same
> FlowFile can be processed again after restarting NiFi.
>
> Thanks,
> Koji
>
>
>
> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman713@gmail.com> wrote:
> > Thanks Koji, I will look into this article about the record model.
> >
> > By the way, that error I previously mentioned to you occurred again, I
> > could see the sql query was executed twice in the log, this time I had
> > turned on the verbose NiFi logging, the sql query is as below:
> >
> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> > c.z.nifi.processors.ExecuteSqlCommand
> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> SELECT
> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> > dbo.ods_extractDataDebug;
> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;
> >
> > and it was executed again later:
> >
> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> > c.z.nifi.processors.ExecuteSqlCommand
> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句失败:SELECT
> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> > dbo.ods_extractDataDebug;
> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(
> SQLServerException.java:217)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
> SQLServerStatement.java:1655)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(
> SQLServerStatement.java:885)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(
> SQLServerStatement.java:778)
> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(
> SQLServerConnection.java:2445)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(
> SQLServerStatement.java:191)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(
> SQLServerStatement.java:166)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
> SQLServerStatement.java:751)
> > at
> > org.apache.commons.dbcp.DelegatingStatement.execute(
> DelegatingStatement.java:264)
> > at
> > org.apache.commons.dbcp.DelegatingStatement.execute(
> DelegatingStatement.java:264)
> > at
> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> executeSql(ExecuteSqlCommand.java:194)
> > at
> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> onTrigger(ExecuteSqlCommand.java:164)
> > at
> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> > at
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1119)
> > at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:147)
> > at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> > at
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:128)
> > at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > I also saw a lot of NiFi's exception like "ProcessException: FlowFile
> > Repository failed to update", not sure if this is the reason the FlowFile
> > got processed twice.  Could you help to take a look at my log file?
> Thanks.
> > You could get the log file via the link:
> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view
> >
> > Best Regards,
> > Ben
> >
> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <ijokarumawak@gmail.com>:
> >
> >> Hi Ben,
> >>
> >> This blog post written by Mark, would be a good starting point to get
> >> familiar with NiFi Record model.
> >> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
> >>
> >> HA for DistributedMapCacheClientService and DistributedMapCacheServer
> >> pair is not supported at the moment. If you need HighAvailability,
> >> RedisDistributedMapCacheClientService with Redis replication will
> >> provide that, I haven't tried that myself though.
> >> https://redis.io/topics/replication
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman713@gmail.com> wrote:
> >> > Thanks for your quick response, Koji, I haven't heard and seen
> anything
> >> > about the NiFi record data model when I was reading the NiFi
> >> > documentations,could you tell me where this model is documented?
> Thanks.
> >> >
> >> > By the way, to my knowledge, when you need to use the
> >> DistributedMapCacheServer
> >> > from DistributedMapCacheClientService, you need to specify the host
> url
> >> for
> >> > the server, this means inside a NiFi cluster
> >> > when I specify the cache server and the node suddenly went down, I
> >> couldn't
> >> > possibly use it until the node goes up again right? Is there currently
> >> such
> >> > a cache server in NiFi that could support HA? Thanks.
> >> >
> >> > Regards,
> >> > Ben
> >> >
> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokarumawak@gmail.com>:
> >> >
> >> >> Hi Ben,
> >> >>
> >> >> As you found from existing code, DistributedMapCache is used to share
> >> >> state among different processors, and it can be used by your custom
> >> >> processors, too.
> >> >> However, I'd recommend to avoid such tight dependencies between
> >> >> FlowFiles if possible, or minimize the part in flow that requires
> that
> >> >> constraint at least for better performance and simplicity.
> >> >> For example, since a FlowFile can hold fairly large amount of data,
> >> >> you could merge all FlowFiles in a single FlowFile, instead of
> batches
> >> >> of FlowFiles. If you need logical boundaries, you can use NiFi Record
> >> >> data model to embed multiple records within a FlowFile, Record should
> >> >> perform better.
> >> >>
> >> >> Hope this helps.
> >> >>
> >> >> Thanks,
> >> >> Koji
> >> >>
> >> >>
> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman713@gmail.com>
wrote:
> >> >> > Hi guys, I'm currently trying to find a proper way in nifi which
> could
> >> >> sync
> >> >> > status between my custom processors.
> >> >> > our requirement is like this, we're doing some ETL work using
nifi
> and
> >> >> I'm
> >> >> > extracting the data from DB into batches of FlowFiles(each batch
of
> >> >> > FlowFile has a flag FlowFile indicating the end of the batch).
> >> >> > There're some groups of custom processors downstream that need
to
> >> process
> >> >> > these FlowFiles to do some business logic work. And we expect
these
> >> >> > processors to process one batch of FlowFiles at a time.
> >> >> > Therefore we need to implement a custom Wait processor(let's just
> >> call it
> >> >> > WaitBatch here) to hold all the other batches of FlowFiles while
> the
> >> >> > business processors were handling the batch of FlowFiles whose
> >> creation
> >> >> > time is earlier.
> >> >> >
> >> >> > In order to implement this, all the WaitBatch processors placed
in
> the
> >> >> flow
> >> >> > need to read/update records in a shared map so that each set of
> >> >> > business-logic processors process one batch at a time.
> >> >> > The entries are keyed using the batch number of the FlowFiles
and
> the
> >> >> value
> >> >> > of each entry is a batch release counter number which counts the
> >> number
> >> >> of
> >> >> > times the batch of FlowFiles has passed through
> >> >> > a WaitBatch processor.
> >> >> > When a batch is released by WaitBatch, it will try to increment
the
> >> batch
> >> >> > number entry's value by 1 and then the released batch number and
> >> counter
> >> >> > number will also be saved locally at the WaitBatch with
> StateManager;
> >> >> > when the next batch reaches the WaitBatch, it will check if the
> >> counter
> >> >> > value of the previous released batch number in the shared map
is
> >> greater
> >> >> > than the one saved locally, if the entry for the batch number
> does't
> >> >> > exist(already removed) or the value in the shared map is greater,
> the
> >> >> next
> >> >> > batch will be released and the local state and the entry on the
> shared
> >> >> map
> >> >> > will be updated similarly.
> >> >> > In the end of the flow, a custom processor will get the batch
> number
> >> from
> >> >> > each batch and remove the entry from the shared map .
> >> >> >
> >> >> > So this implementation requires a shared map that could read/update
> >> >> > frequently and atomically. I checked the Wait/Notify processors
in
> >> NIFI
> >> >> and
> >> >> > saw it is using the DistributedMapCacheClientService and
> >> >> > DistributedMapCacheServer to sync status, so I'm wondering if
I
> could
> >> use
> >> >> > the DistributedMapCacheClientService to implement my logic. I
also
> >> saw
> >> >> > another implementation called RedisDistributedMapCacheClient
> Service
> >> >> > which seems to require Redis(I haven't used Redis).  Thanks in
> advance
> >> >> for
> >> >> > any suggestions.
> >> >> >
> >> >> > Regards,
> >> >> > Ben
> >> >>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message