Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A1810200D69 for ; Wed, 27 Dec 2017 09:10:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9FD3F160C23; Wed, 27 Dec 2017 08:10:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6E766160C20 for ; Wed, 27 Dec 2017 09:10:22 +0100 (CET) Received: (qmail 70958 invoked by uid 500); 27 Dec 2017 08:10:21 -0000 Mailing-List: contact dev-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list dev@nifi.apache.org Received: (qmail 70946 invoked by uid 99); 27 Dec 2017 08:10:21 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Dec 2017 08:10:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 9776F1A02AC for ; Wed, 27 Dec 2017 08:10:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.379 X-Spam-Level: X-Spam-Status: No, score=0.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id OcDRzZvz2dXs for ; Wed, 27 Dec 2017 08:10:17 +0000 (UTC) Received: from mail-qk0-f182.google.com (mail-qk0-f182.google.com [209.85.220.182]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 381A05F3CE for ; Wed, 27 Dec 2017 08:10:16 +0000 (UTC) Received: by mail-qk0-f182.google.com with SMTP id x7so28284052qkb.0 for ; Wed, 27 Dec 2017 00:10:16 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-transfer-encoding; bh=FaMuqsdxLVIbLGLlDk3tcqhSsrsPiY/li3GYIBKi5ws=; b=em3aTWMEXjMLMFyiJUAXe2PUuxMl1RorERaZlb+BUH5348LesErjIzDLtDNHuaYjyg HQhdBW3XfN0yV59DE3INBE1768uCCK+utYDej7Xo5o6xd/3phhW+VQhYfE5M0K4jR62F mpqZvbS7B8ZzJ4K96WWGoMjl8OovvBgcf+zegaiFbJbJMZEvzUjjKEYYvSO7ZGW/YHRL EZMx02I8lPkRBIqnYDdJ2wyx5SY6deRva/BropRNjVmEJE/Vlo0ONeAyQkkAY0X4hIv8 Q6buY3QUYQ3xFTTPNq6Yu+CAWAbqIWBxG7lOwfz25jQG1HIFTFTIFjvkhAgRN6oUM/FQ K0gQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-transfer-encoding; bh=FaMuqsdxLVIbLGLlDk3tcqhSsrsPiY/li3GYIBKi5ws=; b=thhZub0Rn2/RNFXVGg+zvu4BH8CR52A6i+sTKon9GoCPqOqQU0IIItjdIe8lLf+73r jv+x4DsmAEALfiCTG191tSpVWi3uPWikwIFjOWOuR5wfqa/WwOGOU2JUwJTgJ3NOQQ85 BcWWCnHVCzx9jQXGA5I/RMteOdGiI9DXH6szyczq17TxIRbuVwLhYJ9EVGOgIo7AJkTS HDeD3ntWJ3mItrxxKwyXmRICFbDvzJJBcaHwCCJ4D7rvXxXma25/MQY+lAeBZKDPM/H8 RgTKipQsfES8NAaKMFgaNikH53mWkTqrTQMKsumsRfqjazUMHllRVJk0FG09SWSWnuoM Dpgg== X-Gm-Message-State: AKGB3mIWdJQRCwDq1hULA7jm69dQXAlusXFlNDbyAtwmBsYmxl5oIe7e hravpQNKU+4fBDYHr5+vcGPppjfG6j3Bc5A7Nn6rGg== X-Google-Smtp-Source: ACJfBovHhfGJuFJRDMY0blTMvDXQ47b7amR3VLNiCmYmwj1uMNbEPxrO5xQT7A98kHTp8k8bJr2LnTAkoNp+zh/8gs4= X-Received: by 10.233.222.131 with SMTP id s125mr536036qkf.354.1514362215213; Wed, 27 Dec 2017 00:10:15 -0800 (PST) MIME-Version: 1.0 Received: by 10.12.144.140 with HTTP; Wed, 27 Dec 2017 00:10:14 -0800 (PST) In-Reply-To: References: From: Koji Kawamura Date: Wed, 27 Dec 2017 17:10:14 +0900 Message-ID: Subject: Re: proper way in nifi to sync status between custom processors To: dev Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable archived-at: Wed, 27 Dec 2017 08:10:23 -0000 Hi Ben, Excuse me, I'm trying, but probably I don't fully understand what you want to achieve with the flow. It looks weird that WaitBatch is failing with such FlowFile repository error, while other processor such as ReplaceText succeeds. I recommend to test WaitBatch alone first without combining the database related processors, by feeding a test FlowFile having expected FlowFile attributes. Such input FlowFiles can be created by GenerateFlowFile processor. If the same error happens with only WaitBatch processor, then it should be easier to debug. Thanks, Koji On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura wro= te: > Hi Ben, > > The one thing that looks strange in the screenshot is the > ExecuteSqlCommand having FlowFiles queued in its incoming connection. > Those should be transferred to 'failure' relationship. > > Following executeSql() method, shouldn't it re-throw the caught exception= ? > > > try (Connection con =3D dbcpService.getConnection()) { > logger.debug("=E8=AE=BE=E7=BD=AEautoCommit=E4=B8=BAfalse"= ); > con.setAutoCommit(false); > > try (Statement stmt =3D con.createStatement()) { > logger.info("=E6=89=A7=E8=A1=8Csql=E8=AF=AD=E5=8F=A5:= {}", new Object[]{sql}); > stmt.execute(sql); > > // =E6=89=80=E6=9C=89sql=E8=AF=AD=E5=8F=A5=E6=89=A7= =E8=A1=8C=E5=9C=A8=E4=B8=80=E4=B8=AAtransaction=E5=86=85 > logger.debug("=E6=8F=90=E4=BA=A4transaction"); > con.commit(); > } catch (Exception ex) { > logger.error("=E6=89=A7=E8=A1=8Csql=E8=AF=AD=E5=8F=A5= =E5=A4=B1=E8=B4=A5:{}", new Object[]{sql, ex}); > con.rollback(); > //=E5=B0=86exception=E6=8A=9B=E5=88=B0=E5=A4=96=E5=B1= =82=E5=A4=84=E7=90=86 > throw ex; > } finally { > logger.debug("=E9=87=8D=E6=96=B0=E8=AE=BE=E7=BD=AEaut= oCommit=E4=B8=BAtrue"); > con.setAutoCommit(true); > } > } catch (Exception ex) { > // HERE, the exception is swallowed, that's why the FlowFiles stay in > the incoming connection. > logger.error("=E9=87=8D=E8=AF=95=E6=89=A7=E8=A1=8Csql=E8= =AF=AD=E5=8F=A5:{}", new Object[]{sql, ex}); > retryOnFail =3D true; > } > > Thanks, > Koji > > On Wed, Dec 27, 2017 at 2:38 PM, =E5=B0=B9=E6=96=87=E6=89=8D wrote: >> Hi Koji, no problem. You could check the code of processor WaitBatch at = the >> link: >> https://drive.google.com/open?id=3D1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ >> >> I also uploaded a snapshot of part of NiFi flow which includes the >> ExecuteSqlCommand and WaitBatch, you could check the picture at the link= : >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view >> >> You mentioned above that FlowFile repository fails checkpointing will >> affect other processors to process same FlowFile again, but as you could >> see from my snapshot image, the ExecuteSqlCommand is the second processo= r >> and before the WaitBatch processor, even if the FlowFile repository >> checkpointing failure is caused by WaitBatch, could it lead to the >> processors before it to process a FlowFile multiple times? Thanks. >> >> Regards, >> Ben >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura : >> >>> Hi Ben, >>> >>> I was referring these two log messages in your previous email. >>> These two messages are both written by ExecuteSqlCommand, it does not >>> mean 'it was executed again'. >>> >>> ``` >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] >>> c.z.nifi.processors.ExecuteSqlCommand >>> ExecuteSqlCommand[id=3D3c97dfd8-aaa4-3a37-626e-fed5a4822d14] =E6=89=A7= =E8=A1=8Csql=E8=AF=AD=E5=8F=A5: SELECT >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM >>> dbo.ods_extractDataDebug; >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column >>> _id; >>> >>> and it was executed again later: >>> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] >>> c.z.nifi.processors.ExecuteSqlCommand >>> ExecuteSqlCommand[id=3D3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >>> =E6=89=A7=E8=A1=8Csql=E8=AF=AD=E5=8F=A5=E5=A4=B1=E8=B4=A5:SELECT >>> ``` >>> >>> As you written, the case where FlowFile repository fails checkpointing >>> will affect other processors to process same FlowFiles again. However >>> there won't be a simple solution to every processor to rollback its >>> job as different processors do different things. Creating a temp table >>> if not exist seems right approach to me. >>> >>> At the same time, the route cause of getting FlowFile repository >>> failed should be investigated. Is it possible to share WaitBatch code? >>> The reason why ask this is all 'FlowFile Repository failed to update' >>> is related to WaitBatch processor in the log that you shared earlier. >>> >>> Thanks, >>> Koji >>> >>> On Wed, Dec 27, 2017 at 1:19 PM, =E5=B0=B9=E6=96=87=E6=89=8D wrote: >>> > Hi Koji, I will print the sql before actually executing it, but I che= cked >>> > the error log line you mentioned in your reply, this error was thrown= by >>> > NiFi from within another processor called WaitBatch. >>> > I didn't find similar errors as the one from the ExecuteSqlCommand >>> > processor, I think it's because only the ExecuteSqlCommand is used to >>> > create temp database tables. >>> > You could check my ExecuteSqlCommand code via the link: >>> > https://drive.google.com/open?id=3D1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P >>> > >>> > If the error is really caused by FlowFile repository checkpoint failu= re >>> and >>> > the flowfile was executed twice, I may have to create the temp table = only >>> > if doesn't exist, I didn't fix this bug in this way >>> > right away is because I was afraid this fix could cover some other >>> problems. >>> > >>> > Thanks. >>> > >>> > Regards, >>> > Ben >>> > >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura : >>> > >>> >> Hi Ben, >>> >> >>> >> The following two log messages are very close in terms of written >>> >> timestamp, but have different log level. >>> >> 2017-12-26 07:00:01,312 INFO >>> >> 2017-12-26 07:00:01,315 ERROR >>> >> >>> >> I guess those are logged within a single onTrigger of your >>> >> ExecuteSqlCommand custom processor, one is before executing, the oth= er >>> >> is when it caught an exception. Just guessing as I don't have access >>> >> to the code. >>> >> >>> >> Does the same issue happen with other processors bundled with Apache >>> >> NiFi without your custom processor running? >>> >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then the sam= e >>> >> FlowFile can be processed again after restarting NiFi. >>> >> >>> >> Thanks, >>> >> Koji >>> >> >>> >> >>> >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, =E5=B0=B9=E6=96=87=E6=89=8D wrote: >>> >> > Thanks Koji, I will look into this article about the record model. >>> >> > >>> >> > By the way, that error I previously mentioned to you occurred agai= n, I >>> >> > could see the sql query was executed twice in the log, this time I= had >>> >> > turned on the verbose NiFi logging, the sql query is as below: >>> >> > >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] >>> >> > c.z.nifi.processors.ExecuteSqlCommand >>> >> > ExecuteSqlCommand[id=3D3c97dfd8-aaa4-3a37-626e-fed5a4822d14] =E6= =89=A7=E8=A1=8Csql=E8=AF=AD=E5=8F=A5: >>> >> SELECT >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM >>> >> > dbo.ods_extractDataDebug; >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop >>> column >>> >> _id; >>> >> > >>> >> > and it was executed again later: >>> >> > >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] >>> >> > c.z.nifi.processors.ExecuteSqlCommand >>> >> > ExecuteSqlCommand[id=3D3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >>> >> =E6=89=A7=E8=A1=8Csql=E8=AF=AD=E5=8F=A5=E5=A4=B1=E8=B4=A5:SELECT >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM >>> >> > dbo.ods_extractDataDebug; >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop >>> column >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: =E6=95=B0= =E6=8D=AE=E5=BA=93=E4=B8=AD=E5=B7=B2=E5=AD=98=E5=9C=A8=E5=90=8D=E4=B8=BA >>> >> > 'ods_extractDataDebug_20171226031801926_9195' =E7=9A=84=E5=AF=B9= =E8=B1=A1=E3=80=82 >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: =E6=95=B0=E6=8D= =AE=E5=BA=93=E4=B8=AD=E5=B7=B2=E5=AD=98=E5=9C=A8=E5=90=8D=E4=B8=BA >>> >> > 'ods_extractDataDebug_20171226031801926_9195' =E7=9A=84=E5=AF=B9= =E8=B1=A1=E3=80=82 >>> >> > at >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException. >>> makeFromDatabaseError( >>> >> SQLServerException.java:217) >>> >> > at >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult( >>> >> SQLServerStatement.java:1655) >>> >> > at >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement= ( >>> >> SQLServerStatement.java:885) >>> >> > at >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$ >>> StmtExecCmd.doExecute( >>> >> SQLServerStatement.java:778) >>> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer. >>> java:7505) >>> >> > at >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand( >>> >> SQLServerConnection.java:2445) >>> >> > at >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand( >>> >> SQLServerStatement.java:191) >>> >> > at >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement( >>> >> SQLServerStatement.java:166) >>> >> > at >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute( >>> >> SQLServerStatement.java:751) >>> >> > at >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( >>> >> DelegatingStatement.java:264) >>> >> > at >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( >>> >> DelegatingStatement.java:264) >>> >> > at >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. >>> >> executeSql(ExecuteSqlCommand.java:194) >>> >> > at >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. >>> >> onTrigger(ExecuteSqlCommand.java:164) >>> >> > at >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger( >>> >> AbstractProcessor.java:27) >>> >> > at >>> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger( >>> >> StandardProcessorNode.java:1119) >>> >> > at >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( >>> >> ContinuallyRunProcessorTask.java:147) >>> >> > at >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( >>> >> ContinuallyRunProcessorTask.java:47) >>> >> > at >>> >> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1= . >>> run( >>> >> TimerDrivenSchedulingAgent.java:128) >>> >> > at java.util.concurrent.Executors$RunnableAdapter. >>> >> call(Executors.java:511) >>> >> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308= ) >>> >> > at >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>> >> > at >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>> >> > at >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker( >>> >> ThreadPoolExecutor.java:1142) >>> >> > at >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run( >>> >> ThreadPoolExecutor.java:617) >>> >> > at java.lang.Thread.run(Thread.java:745) >>> >> > >>> >> > I also saw a lot of NiFi's exception like "ProcessException: FlowF= ile >>> >> > Repository failed to update", not sure if this is the reason the >>> FlowFile >>> >> > got processed twice. Could you help to take a look at my log file= ? >>> >> Thanks. >>> >> > You could get the log file via the link: >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_ >>> N9Xu6zMEi3/view >>> >> > >>> >> > Best Regards, >>> >> > Ben >>> >> > >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura : >>> >> > >>> >> >> Hi Ben, >>> >> >> >>> >> >> This blog post written by Mark, would be a good starting point to= get >>> >> >> familiar with NiFi Record model. >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nif= i >>> >> >> >>> >> >> HA for DistributedMapCacheClientService and >>> DistributedMapCacheServer >>> >> >> pair is not supported at the moment. If you need HighAvailability= , >>> >> >> RedisDistributedMapCacheClientService with Redis replication will >>> >> >> provide that, I haven't tried that myself though. >>> >> >> https://redis.io/topics/replication >>> >> >> >>> >> >> Thanks, >>> >> >> Koji >>> >> >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, =E5=B0=B9=E6=96=87=E6=89=8D wrote: >>> >> >> > Thanks for your quick response, Koji, I haven't heard and seen >>> >> anything >>> >> >> > about the NiFi record data model when I was reading the NiFi >>> >> >> > documentations,could you tell me where this model is documented= ? >>> >> Thanks. >>> >> >> > >>> >> >> > By the way, to my knowledge, when you need to use the >>> >> >> DistributedMapCacheServer >>> >> >> > from DistributedMapCacheClientService, you need to specify the >>> host >>> >> url >>> >> >> for >>> >> >> > the server, this means inside a NiFi cluster >>> >> >> > when I specify the cache server and the node suddenly went down= , I >>> >> >> couldn't >>> >> >> > possibly use it until the node goes up again right? Is there >>> currently >>> >> >> such >>> >> >> > a cache server in NiFi that could support HA? Thanks. >>> >> >> > >>> >> >> > Regards, >>> >> >> > Ben >>> >> >> > >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura : >>> >> >> > >>> >> >> >> Hi Ben, >>> >> >> >> >>> >> >> >> As you found from existing code, DistributedMapCache is used t= o >>> share >>> >> >> >> state among different processors, and it can be used by your >>> custom >>> >> >> >> processors, too. >>> >> >> >> However, I'd recommend to avoid such tight dependencies betwee= n >>> >> >> >> FlowFiles if possible, or minimize the part in flow that requi= res >>> >> that >>> >> >> >> constraint at least for better performance and simplicity. >>> >> >> >> For example, since a FlowFile can hold fairly large amount of >>> data, >>> >> >> >> you could merge all FlowFiles in a single FlowFile, instead of >>> >> batches >>> >> >> >> of FlowFiles. If you need logical boundaries, you can use NiFi >>> Record >>> >> >> >> data model to embed multiple records within a FlowFile, Record >>> should >>> >> >> >> perform better. >>> >> >> >> >>> >> >> >> Hope this helps. >>> >> >> >> >>> >> >> >> Thanks, >>> >> >> >> Koji >>> >> >> >> >>> >> >> >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, =E5=B0=B9=E6=96=87=E6=89=8D <= batman713@gmail.com> wrote: >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in nifi w= hich >>> >> could >>> >> >> >> sync >>> >> >> >> > status between my custom processors. >>> >> >> >> > our requirement is like this, we're doing some ETL work usin= g >>> nifi >>> >> and >>> >> >> >> I'm >>> >> >> >> > extracting the data from DB into batches of FlowFiles(each >>> batch of >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of the batch= ). >>> >> >> >> > There're some groups of custom processors downstream that ne= ed >>> to >>> >> >> process >>> >> >> >> > these FlowFiles to do some business logic work. And we expec= t >>> these >>> >> >> >> > processors to process one batch of FlowFiles at a time. >>> >> >> >> > Therefore we need to implement a custom Wait processor(let's >>> just >>> >> >> call it >>> >> >> >> > WaitBatch here) to hold all the other batches of FlowFiles w= hile >>> >> the >>> >> >> >> > business processors were handling the batch of FlowFiles who= se >>> >> >> creation >>> >> >> >> > time is earlier. >>> >> >> >> > >>> >> >> >> > In order to implement this, all the WaitBatch processors pla= ced >>> in >>> >> the >>> >> >> >> flow >>> >> >> >> > need to read/update records in a shared map so that each set= of >>> >> >> >> > business-logic processors process one batch at a time. >>> >> >> >> > The entries are keyed using the batch number of the FlowFile= s >>> and >>> >> the >>> >> >> >> value >>> >> >> >> > of each entry is a batch release counter number which counts= the >>> >> >> number >>> >> >> >> of >>> >> >> >> > times the batch of FlowFiles has passed through >>> >> >> >> > a WaitBatch processor. >>> >> >> >> > When a batch is released by WaitBatch, it will try to increm= ent >>> the >>> >> >> batch >>> >> >> >> > number entry's value by 1 and then the released batch number= and >>> >> >> counter >>> >> >> >> > number will also be saved locally at the WaitBatch with >>> >> StateManager; >>> >> >> >> > when the next batch reaches the WaitBatch, it will check if = the >>> >> >> counter >>> >> >> >> > value of the previous released batch number in the shared ma= p is >>> >> >> greater >>> >> >> >> > than the one saved locally, if the entry for the batch numbe= r >>> >> does't >>> >> >> >> > exist(already removed) or the value in the shared map is >>> greater, >>> >> the >>> >> >> >> next >>> >> >> >> > batch will be released and the local state and the entry on = the >>> >> shared >>> >> >> >> map >>> >> >> >> > will be updated similarly. >>> >> >> >> > In the end of the flow, a custom processor will get the batc= h >>> >> number >>> >> >> from >>> >> >> >> > each batch and remove the entry from the shared map . >>> >> >> >> > >>> >> >> >> > So this implementation requires a shared map that could >>> read/update >>> >> >> >> > frequently and atomically. I checked the Wait/Notify process= ors >>> in >>> >> >> NIFI >>> >> >> >> and >>> >> >> >> > saw it is using the DistributedMapCacheClientService and >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm wondering i= f I >>> >> could >>> >> >> use >>> >> >> >> > the DistributedMapCacheClientService to implement my logic. = I >>> also >>> >> >> saw >>> >> >> >> > another implementation called RedisDistributedMapCacheClient >>> >> Service >>> >> >> >> > which seems to require Redis(I haven't used Redis). Thanks = in >>> >> advance >>> >> >> >> for >>> >> >> >> > any suggestions. >>> >> >> >> > >>> >> >> >> > Regards, >>> >> >> >> > Ben >>> >> >> >> >>> >> >> >>> >> >>>