From commits-return-44714-archive-asf-public=cust-asf.ponee.io@nifi.apache.org Tue Jun 1 12:38:35 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id B81F4180636 for ; Tue, 1 Jun 2021 14:38:35 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id F07943F42F for ; Tue, 1 Jun 2021 12:38:34 +0000 (UTC) Received: (qmail 41355 invoked by uid 500); 1 Jun 2021 12:38:34 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 41346 invoked by uid 99); 1 Jun 2021 12:38:34 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Jun 2021 12:38:34 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 5CC1181A86; Tue, 1 Jun 2021 12:38:34 +0000 (UTC) Date: Tue, 01 Jun 2021 12:38:34 +0000 To: "commits@nifi.apache.org" Subject: [nifi] branch main updated: NIFI-8640 Regression with NIFI-8522 NiFi can duplicate controller service during template generation MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <162255111365.1685.4628140219563869087@gitbox.apache.org> From: tpalfy@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: nifi X-Git-Refname: refs/heads/main X-Git-Reftype: branch X-Git-Oldrev: 96d521159bebda2f1270b023d677c0bdff7ca1d6 X-Git-Newrev: f23dcb05f6d6c869fab453634bf094a01b42d121 X-Git-Rev: f23dcb05f6d6c869fab453634bf094a01b42d121 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. tpalfy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git The following commit(s) were added to refs/heads/main by this push: new f23dcb0 NIFI-8640 Regression with NIFI-8522 NiFi can duplicate controller service during template generation f23dcb0 is described below commit f23dcb05f6d6c869fab453634bf094a01b42d121 Author: Timea Barna AuthorDate: Mon May 31 12:52:40 2021 +0200 NIFI-8640 Regression with NIFI-8522 NiFi can duplicate controller service during template generation This closes #5109. Signed-off-by: Tamas Palfy --- .../org/apache/nifi/web/util/SnippetUtils.java | 146 ++++++++++++++------- 1 file changed, 97 insertions(+), 49 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index bde2f95..5d3cfc6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -75,6 +75,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; /** * Template utilities. @@ -145,8 +146,9 @@ public final class SnippetUtils { // we are talking only about the DTO objects that make up the snippet. We do not actually modify the Process Group or the // Controller Services in our flow themselves!) final Set allServicesReferenced = new HashSet<>(); - final Map contentsByGroup = new HashMap<>(); - contentsByGroup.put(processGroup.getIdentifier(), snippetDto); + final Map contentsByGroup = new HashMap<>(); + final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse); + contentsByGroup.put(processGroup.getIdentifier(), highestProcessGroupDTO); // add any processors final Set controllerServices = new HashSet<>(); @@ -162,13 +164,13 @@ public final class SnippetUtils { if (includeControllerServices) { // Include all referenced services that are not already included in this snippet. getControllerServices(processor.getEffectivePropertyValues()).stream() - .filter(allServicesReferenced::add) - .forEach(svc -> { - final String svcGroupId = svc.getParentGroupId(); - final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier(); - svc.setParentGroupId(destinationGroupId); - controllerServices.add(svc); - }); + .filter(allServicesReferenced::add) + .forEach(svc -> { + final String svcGroupId = svc.getParentGroupId(); + final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier(); + svc.setParentGroupId(destinationGroupId); + controllerServices.add(svc); + }); } } } @@ -234,42 +236,33 @@ public final class SnippetUtils { } // add any process groups - final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse); - final Set processGroups = highestProcessGroupDTO.getContents().getProcessGroups(); - fillContentsByGroupMap(highestProcessGroupDTO, contentsByGroup); - - // Maintain a listing of visited groups starting with each group in the snippet. - // This is used to determine whether a referenced controller service should be included in the resulting snippet. - // If the service is defined at groupId or one of it's ancestors, its considered outside of this snippet - // and will only be included when the includeControllerServices is set to true. - // This happens above when considering the processors in this snippet. - final Set visitedGroupIds = new HashSet<>(); - for (final String groupIdentifier : contentsByGroup.keySet()) { - - // Include this group in the ancestry for this snippet, services only get included if the includeControllerServices - // flag is set or if the service is defined within this groups hierarchy within the snippet. - if (!groupIdentifier.equals(processGroup.getIdentifier())) { - visitedGroupIds.add(groupIdentifier); - } - - for (final ProcessorNode procNode : flowController.getFlowManager().getGroup(groupIdentifier).getProcessors()) { - // Include all referenced services that are not already included in this snippet. - getControllerServices(procNode.getEffectivePropertyValues()).stream() - .filter(allServicesReferenced::add) - .filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId())) - .forEach(svc -> { - final String svcGroupId = svc.getParentGroupId(); - final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier(); - svc.setParentGroupId(destinationGroupId); - final FlowSnippetDTO contents = contentsByGroup.get(destinationGroupId); - Set services = contents.getControllerServices(); - if (services == null) { - contents.setControllerServices(Collections.singleton(svc)); - } else { - services.add(svc); - contents.setControllerServices(services); - } - }); + final Set processGroups = new LinkedHashSet<>(); + if (!snippet.getProcessGroups().isEmpty()) { + Set snippetGroupIds = snippet.getProcessGroups().keySet(); + + for (final ProcessGroupDTO group: highestProcessGroupDTO.getContents().getProcessGroups()) { + if (snippetGroupIds.contains(group.getId())) { + contentsByGroup.put(group.getId(), group); + addChildren(group, contentsByGroup); + } + } + + for (final String childGroupId : snippet.getProcessGroups().keySet()) { + final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId); + if (childGroup == null) { + throw new IllegalStateException("A process group in this snippet could not be found."); + } + + ProcessGroupDTO childGroupDto = contentsByGroup.get(childGroupId); + + // maintain a listing of visited groups starting with each group in the snippet. this is used to determine + // whether a referenced controller service should be included in the resulting snippet. if the service is + // defined at groupId or one of it's ancestors, its considered outside of this snippet and will only be included + // when the includeControllerServices is set to true. this happens above when considering the processors in this snippet + final Set visitedGroupIds = new HashSet<>(); + addControllerServices(childGroup, childGroupDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, processGroup.getIdentifier()); + + processGroups.add(childGroupDto); } } @@ -285,6 +278,7 @@ public final class SnippetUtils { } } + // Normalize the coordinates based on the locations of the other components final List components = new ArrayList<>(); components.addAll((Set) processors); @@ -297,7 +291,7 @@ public final class SnippetUtils { components.addAll((Set) remoteProcessGroups); normalizeCoordinates(components); - Set updatedControllerServices = snippetDto.getControllerServices(); + Set updatedControllerServices = contentsByGroup.get(processGroup.getIdentifier()).getContents().getControllerServices(); if (updatedControllerServices == null) { updatedControllerServices = new HashSet<>(); } @@ -316,10 +310,63 @@ public final class SnippetUtils { return snippetDto; } - private void fillContentsByGroupMap(final ProcessGroupDTO processGroup, final Map contentByGroupMap) { + private void addChildren(final ProcessGroupDTO processGroup, final Map contentByGroupMap) { for (final ProcessGroupDTO group: processGroup.getContents().getProcessGroups()) { - contentByGroupMap.put(group.getId(), group.getContents()); - fillContentsByGroupMap(group, contentByGroupMap); + contentByGroupMap.put(group.getId(), group); + addChildren(group, contentByGroupMap); + } + } + + /** + * Finds all Controller Services that are referenced in the given Process Group (and child Process Groups, recursively), and + * adds them to the given servicesByGroup map + * + * @param group the Process Group to start from + * @param contents Process Group's contents + * @param allServicesReferenced a Set of all Controller Service DTO's that have already been referenced; used to dedupe services + * @param contentsByGroup a Map of Process Group ID to the Process Group's contents + * @param highestGroupId the UUID of the 'highest' process group in the snippet + */ + private void addControllerServices(final ProcessGroup group, final FlowSnippetDTO contents, final Set allServicesReferenced, + final boolean includeControllerServices, final Set visitedGroupIds, final Map contentsByGroup, final String highestGroupId) { + + // include this group in the ancestry for this snippet, services only get included if the includeControllerServices + // flag is set or if the service is defined within this groups hierarchy within the snippet + visitedGroupIds.add(group.getIdentifier()); + + for (final ProcessorNode procNode : group.getProcessors()) { + // Include all referenced services that are not already included in this snippet. + getControllerServices(procNode.getEffectivePropertyValues()).stream() + .filter(allServicesReferenced::add) + .filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId())) + .forEach(svc -> { + final String svcGroupId = svc.getParentGroupId(); + final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : highestGroupId; + svc.setParentGroupId(destinationGroupId); + final FlowSnippetDTO snippetDto = contentsByGroup.get(destinationGroupId).getContents(); + if (snippetDto != null) { + Set services = snippetDto.getControllerServices(); + if (services == null) { + snippetDto.setControllerServices(Collections.singleton(svc)); + } else { + services.add(svc); + snippetDto.setControllerServices(services); + } + } + }); + } + + // Map child process group ID to the child process group for easy lookup + final Map childGroupMap = contents.getProcessGroups().stream() + .collect(Collectors.toMap(ComponentDTO::getId, childGroupDto -> childGroupDto)); + + for (final ProcessGroup childGroup : group.getProcessGroups()) { + final ProcessGroupDTO childDto = childGroupMap.get(childGroup.getIdentifier()); + if (childDto == null) { + continue; + } + + addControllerServices(childGroup, childDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, highestGroupId); } } @@ -345,6 +392,7 @@ public final class SnippetUtils { return serviceDtos; } + public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed, boolean isCopy) { final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed, isCopy); resolveNameConflicts(snippetCopy, group);