nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koji Kawamura <ijokaruma...@gmail.com>
Subject Re: proper way in nifi to sync status between custom processors
Date Wed, 27 Dec 2017 08:10:14 GMT
Hi Ben,

Excuse me, I'm trying, but probably I don't fully understand what you
want to achieve with the flow.

It looks weird that WaitBatch is failing with such FlowFile repository
error, while other processor such as ReplaceText succeeds.
I recommend to test WaitBatch alone first without combining the
database related processors, by feeding a test FlowFile having
expected FlowFile attributes.
Such input FlowFiles can be created by GenerateFlowFile processor.
If the same error happens with only WaitBatch processor, then it
should be easier to debug.

Thanks,
Koji

On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokarumawak@gmail.com> wrote:
> Hi Ben,
>
> The one thing that looks strange in the screenshot is the
> ExecuteSqlCommand having FlowFiles queued in its incoming connection.
> Those should be transferred to 'failure' relationship.
>
> Following executeSql() method, shouldn't it re-throw the caught exception?
>
>
>             try (Connection con = dbcpService.getConnection()) {
>                 logger.debug("设置autoCommit为false");
>                 con.setAutoCommit(false);
>
>                 try (Statement stmt = con.createStatement()) {
>                     logger.info("执行sql语句: {}", new Object[]{sql});
>                     stmt.execute(sql);
>
>                     // 所有sql语句执行在一个transaction内
>                     logger.debug("提交transaction");
>                     con.commit();
>                 } catch (Exception ex) {
>                     logger.error("执行sql语句失败:{}", new Object[]{sql, ex});
>                     con.rollback();
>                     //将exception抛到外层处理
>                     throw ex;
>                 } finally {
>                     logger.debug("重新设置autoCommit为true");
>                     con.setAutoCommit(true);
>                 }
>             } catch (Exception ex) {
> // HERE, the exception is swallowed, that's why the FlowFiles stay in
> the incoming connection.
>                 logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
>                 retryOnFail = true;
>             }
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman713@gmail.com> wrote:
>> Hi Koji, no problem. You could check the code of processor WaitBatch at the
>> link:
>> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
>>
>> I also uploaded a snapshot of part of NiFi flow which includes the
>> ExecuteSqlCommand and WaitBatch, you could check the picture at the link:
>> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view
>>
>> You mentioned above that FlowFile repository fails checkpointing will
>> affect other processors to process same FlowFile again, but as you could
>> see from my snapshot image, the ExecuteSqlCommand is the second processor
>> and before the WaitBatch processor, even if the FlowFile repository
>> checkpointing failure is caused by WaitBatch, could it lead to the
>> processors before it to process a FlowFile multiple times? Thanks.
>>
>> Regards,
>> Ben
>>
>> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokarumawak@gmail.com>:
>>
>>> Hi Ben,
>>>
>>> I was referring these two log messages in your previous email.
>>> These two messages are both written by ExecuteSqlCommand, it does not
>>> mean 'it was executed again'.
>>>
>>> ```
>>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>>> c.z.nifi.processors.ExecuteSqlCommand
>>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
>>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>>> dbo.ods_extractDataDebug;
>>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>>> _id;
>>>
>>> and it was executed again later:
>>>
>>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>>> c.z.nifi.processors.ExecuteSqlCommand
>>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>>> 执行sql语句失败:SELECT
>>> ```
>>>
>>> As you written, the case where FlowFile repository fails checkpointing
>>> will affect other processors to process same FlowFiles again. However
>>> there won't be a simple solution to every processor to rollback its
>>> job as different processors do different things. Creating a temp table
>>> if not exist seems right approach to me.
>>>
>>> At the same time, the route cause of getting FlowFile repository
>>> failed should be investigated. Is it possible to share WaitBatch code?
>>> The reason why ask this is all 'FlowFile Repository failed to update'
>>> is related to WaitBatch processor in the log that you shared earlier.
>>>
>>> Thanks,
>>> Koji
>>>
>>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman713@gmail.com> wrote:
>>> > Hi Koji, I will print the sql before actually executing it, but I checked
>>> > the error log line you mentioned in your reply, this error was thrown by
>>> > NiFi from within another processor called WaitBatch.
>>> > I didn't find similar errors as the one from the ExecuteSqlCommand
>>> > processor, I think it's because only the ExecuteSqlCommand is used to
>>> > create temp database tables.
>>> > You could check my ExecuteSqlCommand code via the link:
>>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
>>> >
>>> > If the error is really caused by FlowFile repository checkpoint failure
>>> and
>>> > the flowfile was executed twice, I may have to create the temp table only
>>> > if doesn't exist, I didn't fix this bug in this way
>>> > right away is because I was afraid this fix could cover some other
>>> problems.
>>> >
>>> > Thanks.
>>> >
>>> > Regards,
>>> > Ben
>>> >
>>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokarumawak@gmail.com>:
>>> >
>>> >> Hi Ben,
>>> >>
>>> >> The following two log messages are very close in terms of written
>>> >> timestamp, but have different log level.
>>> >> 2017-12-26 07:00:01,312 INFO
>>> >> 2017-12-26 07:00:01,315 ERROR
>>> >>
>>> >> I guess those are logged within a single onTrigger of your
>>> >> ExecuteSqlCommand custom processor, one is before executing, the other
>>> >> is when it caught an exception. Just guessing as I don't have access
>>> >> to the code.
>>> >>
>>> >> Does the same issue happen with other processors bundled with Apache
>>> >> NiFi without your custom processor running?
>>> >>
>>> >> If NiFi fails to update/checkpoint FlowFile repository, then the same
>>> >> FlowFile can be processed again after restarting NiFi.
>>> >>
>>> >> Thanks,
>>> >> Koji
>>> >>
>>> >>
>>> >>
>>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman713@gmail.com>
wrote:
>>> >> > Thanks Koji, I will look into this article about the record model.
>>> >> >
>>> >> > By the way, that error I previously mentioned to you occurred again,
I
>>> >> > could see the sql query was executed twice in the log, this time
I had
>>> >> > turned on the verbose NiFi logging, the sql query is as below:
>>> >> >
>>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>>> >> > c.z.nifi.processors.ExecuteSqlCommand
>>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
>>> >> SELECT
>>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>>> >> > dbo.ods_extractDataDebug;
>>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
>>> column
>>> >> _id;
>>> >> >
>>> >> > and it was executed again later:
>>> >> >
>>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>>> >> > c.z.nifi.processors.ExecuteSqlCommand
>>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>>> >> 执行sql语句失败:SELECT
>>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>>> >> > dbo.ods_extractDataDebug;
>>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
>>> column
>>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>>> >> > at
>>> >> > com.microsoft.sqlserver.jdbc.SQLServerException.
>>> makeFromDatabaseError(
>>> >> SQLServerException.java:217)
>>> >> > at
>>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
>>> >> SQLServerStatement.java:1655)
>>> >> > at
>>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(
>>> >> SQLServerStatement.java:885)
>>> >> > at
>>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$
>>> StmtExecCmd.doExecute(
>>> >> SQLServerStatement.java:778)
>>> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.
>>> java:7505)
>>> >> > at
>>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(
>>> >> SQLServerConnection.java:2445)
>>> >> > at
>>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(
>>> >> SQLServerStatement.java:191)
>>> >> > at
>>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(
>>> >> SQLServerStatement.java:166)
>>> >> > at
>>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
>>> >> SQLServerStatement.java:751)
>>> >> > at
>>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
>>> >> DelegatingStatement.java:264)
>>> >> > at
>>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
>>> >> DelegatingStatement.java:264)
>>> >> > at
>>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>>> >> executeSql(ExecuteSqlCommand.java:194)
>>> >> > at
>>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>>> >> onTrigger(ExecuteSqlCommand.java:164)
>>> >> > at
>>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
>>> >> AbstractProcessor.java:27)
>>> >> > at
>>> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>>> >> StandardProcessorNode.java:1119)
>>> >> > at
>>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>>> >> ContinuallyRunProcessorTask.java:147)
>>> >> > at
>>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>>> >> ContinuallyRunProcessorTask.java:47)
>>> >> > at
>>> >> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
>>> run(
>>> >> TimerDrivenSchedulingAgent.java:128)
>>> >> > at java.util.concurrent.Executors$RunnableAdapter.
>>> >> call(Executors.java:511)
>>> >> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> >> > at
>>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
>>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> >> > at
>>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
>>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> >> > at
>>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> >> ThreadPoolExecutor.java:1142)
>>> >> > at
>>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> >> ThreadPoolExecutor.java:617)
>>> >> > at java.lang.Thread.run(Thread.java:745)
>>> >> >
>>> >> > I also saw a lot of NiFi's exception like "ProcessException: FlowFile
>>> >> > Repository failed to update", not sure if this is the reason the
>>> FlowFile
>>> >> > got processed twice.  Could you help to take a look at my log file?
>>> >> Thanks.
>>> >> > You could get the log file via the link:
>>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_
>>> N9Xu6zMEi3/view
>>> >> >
>>> >> > Best Regards,
>>> >> > Ben
>>> >> >
>>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <ijokarumawak@gmail.com>:
>>> >> >
>>> >> >> Hi Ben,
>>> >> >>
>>> >> >> This blog post written by Mark, would be a good starting point
to get
>>> >> >> familiar with NiFi Record model.
>>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
>>> >> >>
>>> >> >> HA for DistributedMapCacheClientService and
>>> DistributedMapCacheServer
>>> >> >> pair is not supported at the moment. If you need HighAvailability,
>>> >> >> RedisDistributedMapCacheClientService with Redis replication
will
>>> >> >> provide that, I haven't tried that myself though.
>>> >> >> https://redis.io/topics/replication
>>> >> >>
>>> >> >> Thanks,
>>> >> >> Koji
>>> >> >>
>>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman713@gmail.com>
wrote:
>>> >> >> > Thanks for your quick response, Koji, I haven't heard
and seen
>>> >> anything
>>> >> >> > about the NiFi record data model when I was reading the
NiFi
>>> >> >> > documentations,could you tell me where this model is documented?
>>> >> Thanks.
>>> >> >> >
>>> >> >> > By the way, to my knowledge, when you need to use the
>>> >> >> DistributedMapCacheServer
>>> >> >> > from DistributedMapCacheClientService, you need to specify
the
>>> host
>>> >> url
>>> >> >> for
>>> >> >> > the server, this means inside a NiFi cluster
>>> >> >> > when I specify the cache server and the node suddenly
went down, I
>>> >> >> couldn't
>>> >> >> > possibly use it until the node goes up again right? Is
there
>>> currently
>>> >> >> such
>>> >> >> > a cache server in NiFi that could support HA? Thanks.
>>> >> >> >
>>> >> >> > Regards,
>>> >> >> > Ben
>>> >> >> >
>>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokarumawak@gmail.com>:
>>> >> >> >
>>> >> >> >> Hi Ben,
>>> >> >> >>
>>> >> >> >> As you found from existing code, DistributedMapCache
is used to
>>> share
>>> >> >> >> state among different processors, and it can be used
by your
>>> custom
>>> >> >> >> processors, too.
>>> >> >> >> However, I'd recommend to avoid such tight dependencies
between
>>> >> >> >> FlowFiles if possible, or minimize the part in flow
that requires
>>> >> that
>>> >> >> >> constraint at least for better performance and simplicity.
>>> >> >> >> For example, since a FlowFile can hold fairly large
amount of
>>> data,
>>> >> >> >> you could merge all FlowFiles in a single FlowFile,
instead of
>>> >> batches
>>> >> >> >> of FlowFiles. If you need logical boundaries, you
can use NiFi
>>> Record
>>> >> >> >> data model to embed multiple records within a FlowFile,
Record
>>> should
>>> >> >> >> perform better.
>>> >> >> >>
>>> >> >> >> Hope this helps.
>>> >> >> >>
>>> >> >> >> Thanks,
>>> >> >> >> Koji
>>> >> >> >>
>>> >> >> >>
>>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman713@gmail.com>
wrote:
>>> >> >> >> > Hi guys, I'm currently trying to find a proper
way in nifi which
>>> >> could
>>> >> >> >> sync
>>> >> >> >> > status between my custom processors.
>>> >> >> >> > our requirement is like this, we're doing some
ETL work using
>>> nifi
>>> >> and
>>> >> >> >> I'm
>>> >> >> >> > extracting the data from DB into batches of FlowFiles(each
>>> batch of
>>> >> >> >> > FlowFile has a flag FlowFile indicating the end
of the batch).
>>> >> >> >> > There're some groups of custom processors downstream
that need
>>> to
>>> >> >> process
>>> >> >> >> > these FlowFiles to do some business logic work.
And we expect
>>> these
>>> >> >> >> > processors to process one batch of FlowFiles
at a time.
>>> >> >> >> > Therefore we need to implement a custom Wait
processor(let's
>>> just
>>> >> >> call it
>>> >> >> >> > WaitBatch here) to hold all the other batches
of FlowFiles while
>>> >> the
>>> >> >> >> > business processors were handling the batch of
FlowFiles whose
>>> >> >> creation
>>> >> >> >> > time is earlier.
>>> >> >> >> >
>>> >> >> >> > In order to implement this, all the WaitBatch
processors placed
>>> in
>>> >> the
>>> >> >> >> flow
>>> >> >> >> > need to read/update records in a shared map so
that each set of
>>> >> >> >> > business-logic processors process one batch at
a time.
>>> >> >> >> > The entries are keyed using the batch number
of the FlowFiles
>>> and
>>> >> the
>>> >> >> >> value
>>> >> >> >> > of each entry is a batch release counter number
which counts the
>>> >> >> number
>>> >> >> >> of
>>> >> >> >> > times the batch of FlowFiles has passed through
>>> >> >> >> > a WaitBatch processor.
>>> >> >> >> > When a batch is released by WaitBatch, it will
try to increment
>>> the
>>> >> >> batch
>>> >> >> >> > number entry's value by 1 and then the released
batch number and
>>> >> >> counter
>>> >> >> >> > number will also be saved locally at the WaitBatch
with
>>> >> StateManager;
>>> >> >> >> > when the next batch reaches the WaitBatch, it
will check if the
>>> >> >> counter
>>> >> >> >> > value of the previous released batch number in
the shared map is
>>> >> >> greater
>>> >> >> >> > than the one saved locally, if the entry for
the batch number
>>> >> does't
>>> >> >> >> > exist(already removed) or the value in the shared
map is
>>> greater,
>>> >> the
>>> >> >> >> next
>>> >> >> >> > batch will be released and the local state and
the entry on the
>>> >> shared
>>> >> >> >> map
>>> >> >> >> > will be updated similarly.
>>> >> >> >> > In the end of the flow, a custom processor will
get the batch
>>> >> number
>>> >> >> from
>>> >> >> >> > each batch and remove the entry from the shared
map .
>>> >> >> >> >
>>> >> >> >> > So this implementation requires a shared map
that could
>>> read/update
>>> >> >> >> > frequently and atomically. I checked the Wait/Notify
processors
>>> in
>>> >> >> NIFI
>>> >> >> >> and
>>> >> >> >> > saw it is using the DistributedMapCacheClientService
and
>>> >> >> >> > DistributedMapCacheServer to sync status, so
I'm wondering if I
>>> >> could
>>> >> >> use
>>> >> >> >> > the DistributedMapCacheClientService to implement
my logic. I
>>> also
>>> >> >> saw
>>> >> >> >> > another implementation called RedisDistributedMapCacheClient
>>> >> Service
>>> >> >> >> > which seems to require Redis(I haven't used Redis).
 Thanks in
>>> >> advance
>>> >> >> >> for
>>> >> >> >> > any suggestions.
>>> >> >> >> >
>>> >> >> >> > Regards,
>>> >> >> >> > Ben
>>> >> >> >>
>>> >> >>
>>> >>
>>>

Mime
View raw message