nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi] jsferner commented on a change in pull request #4023: NIFI-6873: Added support for replacing a process group via import
Date Thu, 06 Feb 2020 13:35:42 GMT
jsferner commented on a change in pull request #4023: NIFI-6873: Added support for replacing
a process group via import
URL: https://github.com/apache/nifi/pull/4023#discussion_r375835725
 
 

 ##########
 File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 ##########
 @@ -3894,6 +3889,252 @@ public Response createControllerService(
         );
     }
 
+    /**
+     * Initiates the request to replace the Process Group with the given ID with the Process
Group in the given import entity
+     *
+     * @param groupId          The id of the process group to replace
+     * @param importEntity     A request entity containing revision info and the process
group to replace with
+     * @return A ProcessGroupReplaceRequestEntity.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/replace-requests")
+    @ApiOperation(
+            value = "Initiate the Replace Request of a Process Group with the given ID",
+            response = ProcessGroupReplaceRequestEntity.class,
+            notes = "This will initiate the action of replacing a process group with the
given process group. This can be a lengthy "
+                    + "process, as it will stop any Processors and disable any Controller
Services necessary to perform the action and then restart them. As a result, "
+                    + "the endpoint will immediately return a ProcessGroupReplaceRequestEntity,
and the process of replacing the flow will occur "
+                    + "asynchronously in the background. The client may then periodically
poll the status of the request by issuing a GET request to "
+                    + "/process-groups/replace-requests/{requestId}. Once the request is
completed, the client is expected to issue a DELETE request to "
+                    + "/process-groups/replace-requests/{requestId}. " + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Read - /process-groups/{uuid}"),
+                    @Authorization(value = "Write - /process-groups/{uuid}"),
+                    @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated
components"),
+                    @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated
components"),
+                    @Authorization(value = "Write - if the template contains any restricted
components - /restricted-components"),
+                    @Authorization(value = "Read - /parameter-contexts/{uuid} - For any Parameter
Context that is referenced by a Property that is changed, added, or removed")
+            }
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because
it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in
the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response initiateReplaceProcessGroup(@ApiParam(value = "The process group id.",
required = true) @PathParam("id") final String groupId,
+                                                @ApiParam(value = "The process group replace
request entity", required = true) final ProcessGroupImportEntity importEntity) {
+        // replacing a flow under version control is not permitted via import. Versioned
flows have additional requirements to allow
+        // them only to be replaced by a different version of the same flow.
+        if (serviceFacade.isAnyProcessGroupUnderVersionControl(groupId)) {
+            throw new IllegalStateException("Cannot replace a Process Group via import while
it or its descendants are under Version Control.");
+        }
+
+        final VersionedFlowSnapshot versionedFlowSnapshot = importEntity.getVersionedFlowSnapshot();
+        if (versionedFlowSnapshot == null) {
+            throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied");
+        }
+
+        return initiateFlowUpdate(groupId, importEntity, true, "replace-requests",
+                "/nifi-api/process-groups/" + groupId + "/replace", importEntity::getVersionedFlowSnapshot);
+    }
+
+    /**
+     * Replace the Process Group with the given ID with the specified Process Group.
+     *
+     * This is the endpoint used in a cluster update replication scenario.
+     *
+     * @param groupId          The id of the process group to replace
+     * @param importEntity     A request entity containing revision info and the process
group to replace with
+     * @return A ProcessGroupImportEntity.
+     */
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/replace")
+    @ApiOperation(
+            value = "Replace Process Group with the given ID with the specified Process Group",
+            response = ProcessGroupImportEntity.class,
+            notes = "This endpoint is used for replication within a cluster, when replacing
a flow with a new flow. It expects that the flow being"
+                    + "replaced is not under version control and that the given snapshot
will not modify any Processor that is currently running "
+                    + "or any Controller Service that is enabled. "
+                    + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Read - /process-groups/{uuid}"),
+                    @Authorization(value = "Write - /process-groups/{uuid}")
+            })
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because
it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in
the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response replaceProcessGroup(@ApiParam(value = "The process group id.", required
= true) @PathParam("id") final String groupId,
+                                        @ApiParam(value = "The process group replace request
entity.", required = true) final ProcessGroupImportEntity importEntity) {
+
+        // Verify the request
+        final RevisionDTO revisionDto = importEntity.getProcessGroupRevision();
+        if (revisionDto == null) {
+            throw new IllegalArgumentException("Process Group Revision must be specified.");
+        }
+
+        final VersionedFlowSnapshot requestFlowSnapshot = importEntity.getVersionedFlowSnapshot();
+        if (requestFlowSnapshot == null) {
+            throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied.");
+        }
+
+        // Perform the request
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.PUT, importEntity);
+        } else if (isDisconnectedFromCluster()) {
+            verifyDisconnectedNodeModification(importEntity.isDisconnectedNodeAcknowledged());
+        }
+
+        final Revision requestRevision = getRevision(importEntity.getProcessGroupRevision(),
groupId);
+        return withWriteLock(
+                serviceFacade,
+                importEntity,
+                requestRevision,
+                lookup -> {
+                    final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+                    final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+                    processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+                    processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+                },
+                () -> {
+                    // We do not enforce that the Process Group is 'not dirty' because at
this point,
+                    // the client has explicitly indicated the dataflow that the Process
Group should
+                    // provide and provided the Revision to ensure that they have the most
up-to-date
+                    // view of the Process Group.
+                    serviceFacade.verifyCanUpdate(groupId, requestFlowSnapshot, true, false);
+                },
+                (revision, entity) -> {
+                    final ProcessGroupEntity updatedGroup =
+                            performUpdateFlow(groupId, revision, importEntity, entity.getVersionedFlowSnapshot(),
+                                    getIdGenerationSeed().orElse(null), false, true);
+
+                    // response to replication request is an entity with revision info but
no versioned flow snapshot
+                    final ProcessGroupImportEntity responseEntity = new ProcessGroupImportEntity();
+                    responseEntity.setProcessGroupRevision(updatedGroup.getRevision());
+
+                    return generateOkResponse(responseEntity).build();
+                });
+    }
+
+    /**
+     * Retrieve a request to replace a Process Group by request ID.
+     *
+     * @param replaceRequestId  The ID of the replace request
+     * @return A ProcessGroupReplaceRequestEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("replace-requests/{id}")
+    @ApiOperation(
+            value = "Returns the Replace Request with the given ID",
+            response = ProcessGroupReplaceRequestEntity.class,
+            notes = "Returns the Replace Request with the given ID. Once a Replace Request
has been created by performing a POST to /process-groups/{id}/replace-requests, "
+                    + "that request can subsequently be retrieved via this endpoint, and
the request that is fetched will contain the updated state, such as percent complete, the
"
+                    + "current state of the request, and any failures. "
+                    + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Only the user that submitted the request can
get it")
+            })
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because
it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in
the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response getReplaceProcessGroupRequest(
+            @ApiParam("The ID of the Replace Request") @PathParam("id") final String replaceRequestId)
{
+        return retrieveFlowUpdateRequest("replace-requests", replaceRequestId);
+    }
+
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("replace-requests/{id}")
+    @ApiOperation(
+            value = "Deletes the Replace Request with the given ID",
+            response = ProcessGroupReplaceRequestEntity.class,
+            notes = "Deletes the Replace Request with the given ID. After a request is created
via a POST to /process-groups/{id}/replace-requests, it is expected "
+                    + "that the client will properly clean up the request by DELETE'ing it,
once the Replace process has completed. If the request is deleted before the request "
+                    + "completes, then the Replace request will finish the step that it is
currently performing and then will cancel any subsequent steps. "
+                    + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Only the user that submitted the request can
remove it")
+            })
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete the request because
it was invalid. The request should not be retried without modification."),
+            @ApiResponse(code = 401, message = "Client could not be authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi was not in
the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response deleteReplaceProcessGroupRequest(
+            @ApiParam(value = "Acknowledges that this node is disconnected to allow for mutable
requests to proceed.", required = false)
+                @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final
Boolean disconnectedNodeAcknowledged,
+            @ApiParam("The ID of the Update Request") @PathParam("id") final String replaceRequestId)
{
+        return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue());
+    }
+
+    /**
+     * Perform actual flow update of the specified flow. This is used for the initial flow
update and replication updates.
+     */
+    @Override
+    protected ProcessGroupEntity performUpdateFlow(final String groupId, final Revision revision,
final ProcessGroupImportEntity requestEntity,
+                                                   final VersionedFlowSnapshot flowSnapshot,
final String idGenerationSeed,
+                                                   final boolean verifyNotModified, final
boolean updateDescendantVersionedFlows) {
+        logger.info("Replacing Process Group with ID {} with imported Process Group with
ID {}", groupId, flowSnapshot.getFlowContents().getIdentifier());
+
+        // Step 10-11. Update Process Group to the new flow (including name) and update variable
registry with any Variables that were added or removed
 
 Review comment:
   agreed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message