Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C67E6200D64 for ; Tue, 26 Dec 2017 09:55:54 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C5183160C10; Tue, 26 Dec 2017 08:55:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 16F29160C00 for ; Tue, 26 Dec 2017 09:55:53 +0100 (CET) Received: (qmail 53024 invoked by uid 500); 26 Dec 2017 08:55:53 -0000 Mailing-List: contact dev-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list dev@nifi.apache.org Received: (qmail 53012 invoked by uid 99); 26 Dec 2017 08:55:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Dec 2017 08:55:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 5D6CE1A0591 for ; Tue, 26 Dec 2017 08:55:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.129 X-Spam-Level: ** X-Spam-Status: No, score=2.129 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id GmZUevqUnLuP for ; Tue, 26 Dec 2017 08:55:50 +0000 (UTC) Received: from mail-ua0-f174.google.com (mail-ua0-f174.google.com [209.85.217.174]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 71B2D5FB9E for ; Tue, 26 Dec 2017 08:55:49 +0000 (UTC) Received: by mail-ua0-f174.google.com with SMTP id 34so5373685uav.5 for ; Tue, 26 Dec 2017 00:55:49 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=JmNO02k52M3E0ULR0y6eSa+SQkh29TWRmi/4FsaZpF4=; b=aziSEypPjpDNyb81mjzDX4n9ltaYsKTx2BkAum45otCAqvnlPsFkVR1e8Kojlm9jWA 1HIqN+3tbnPW4Wgap3fndyXxauNZt8HS6hL9K1TT7WZuMSp+cstwdHvuQf/GDgBpyntU UO8NGk7vah8MKGwANjYC9PtTuy5pGATSVb5CUH+aaf3pOE8J8AzkaorMBLMJUvlftNA5 F6azMxkvCNRqdO6ManaKRV+BQe+kPX3FNJfeYd/6V0OecrgzxxZX8srEZRpKl/rE9bHJ AXjENf7ynerRZudCuzMM1uG0l72cWCB74DSBDhGu2S88TBkL3rSh2KMCK7qvGRNyX3H9 TSlA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=JmNO02k52M3E0ULR0y6eSa+SQkh29TWRmi/4FsaZpF4=; b=AxjqZkZhsc0BgttDzJy4O5Yt0TreY9Y7vApEzSEk+jpJJa3I6qzukaQ7eO54YyXiIs DZaq2TK7H8hm21DBzhyUVKqcWtXp/2HL+GLo5TCAyWioW8tS9nRZh1cBU9wj1DCv2fnI p0ukDIF1gfO+RfqM2UfTRGW0bPW9dBJesiRTixDZUgb79Belu0Vtf/fNDOJCwdIMH9Xq 5Ri+NGWizTQc9iyPv3CmghYrO5BZIX9Vyk2qNQmBnTOjtIixedv5FvMw29jJjW9fwNAs PYFzYI9xh3C9BccOczFni4DpXvJbK3UNPIQAt3oLccvKDkGS5GDgY9RTzCVdlFSM/ifj SKeA== X-Gm-Message-State: AKGB3mJbbMPg6NZxMtTB+nVZDXNaIFY9iFa3gSDnm3NRMA7TOGiY1EJq PEU+wyRP5MJC08rLBWtgMIFuXmSaAhm14aQZul336A== X-Google-Smtp-Source: ACJfBoshzwzusOCfI+vA5QsTK1zxQW51aa79r1UZZ6qU8L3r7e+0rjF4Wyifwm0ccLelvgUBBcG8K25CaPzyFbbcnPg= X-Received: by 10.159.46.21 with SMTP id t21mr5226812uaj.125.1514278548249; Tue, 26 Dec 2017 00:55:48 -0800 (PST) MIME-Version: 1.0 Received: by 10.176.73.199 with HTTP; Tue, 26 Dec 2017 00:55:47 -0800 (PST) From: =?UTF-8?B?5bC55paH5omN?= Date: Tue, 26 Dec 2017 16:55:47 +0800 Message-ID: Subject: proper way in nifi to sync status between custom processors To: dev@nifi.apache.org Content-Type: multipart/alternative; boundary="089e08256cf8f5a03805613a723f" archived-at: Tue, 26 Dec 2017 08:55:55 -0000 --089e08256cf8f5a03805613a723f Content-Type: text/plain; charset="UTF-8" Hi guys, I'm currently trying to find a proper way in nifi which could sync status between my custom processors. our requirement is like this, we're doing some ETL work using nifi and I'm extracting the data from DB into batches of FlowFiles(each batch of FlowFile has a flag FlowFile indicating the end of the batch). There're some groups of custom processors downstream that need to process these FlowFiles to do some business logic work. And we expect these processors to process one batch of FlowFiles at a time. Therefore we need to implement a custom Wait processor(let's just call it WaitBatch here) to hold all the other batches of FlowFiles while the business processors were handling the batch of FlowFiles whose creation time is earlier. In order to implement this, all the WaitBatch processors placed in the flow need to read/update records in a shared map so that each set of business-logic processors process one batch at a time. The entries are keyed using the batch number of the FlowFiles and the value of each entry is a batch release counter number which counts the number of times the batch of FlowFiles has passed through a WaitBatch processor. When a batch is released by WaitBatch, it will try to increment the batch number entry's value by 1 and then the released batch number and counter number will also be saved locally at the WaitBatch with StateManager; when the next batch reaches the WaitBatch, it will check if the counter value of the previous released batch number in the shared map is greater than the one saved locally, if the entry for the batch number does't exist(already removed) or the value in the shared map is greater, the next batch will be released and the local state and the entry on the shared map will be updated similarly. In the end of the flow, a custom processor will get the batch number from each batch and remove the entry from the shared map . So this implementation requires a shared map that could read/update frequently and atomically. I checked the Wait/Notify processors in NIFI and saw it is using the DistributedMapCacheClientService and DistributedMapCacheServer to sync status, so I'm wondering if I could use the DistributedMapCacheClientService to implement my logic. I also saw another implementation called RedisDistributedMapCacheClientService which seems to require Redis(I haven't used Redis). Thanks in advance for any suggestions. Regards, Ben --089e08256cf8f5a03805613a723f--