nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi] adarmiento commented on a change in pull request #3700: NIFI-6638: Empty multiple queues at once at different flow levels
Date Mon, 09 Sep 2019 13:30:40 GMT
adarmiento commented on a change in pull request #3700: NIFI-6638: Empty multiple queues at
once at different flow levels
URL: https://github.com/apache/nifi/pull/3700#discussion_r322239146
 
 

 ##########
 File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
 ##########
 @@ -212,6 +212,417 @@
             }).size() > 0;
     };
 
+    /**
+     * Empty all the queues inside the array of connections.
+     *
+     * @Param {string} actionName
+     * @param {Array} connections
+     * @Param {Array} errors
+     *
+     */
+    var emptyQueues = function (actionName, connections, errors) {
+        var cancelled = false;
+        var finished = false;
+        var progressBarRefreshingDelay = 100; //millis
+
+        var dropRequests = [];
+        var dropRequestTimers = [];
+        var progressBarRefreshTimer = null;
+        var singleEmptyQueuePromises = [];
+
+        var createFailedResponse = function (xhr, status, error) {
+            return {
+                success: false,
+                xhr: xhr,
+                status: status,
+                error: error
+            };
+        };
+
+        var createSuccessResponse = function () {
+            return {
+                success: true
+            };
+        };
+
+        // set the progress bar to a certain percentage
+        var setProgressBar = function (percentComplete) {
+            if($("#drop-request-percent-complete .progress-label").length) {
+                //set the request status text
+                $('#drop-request-status-message').text('');
+
+                //set progress bar
+                $('#drop-request-percent-complete .md-hue-2 .md-container.md-mode-determinate
.md-bar.md-bar2:first')
+                    .attr('style','transform: translateX(' + ((percentComplete - 100) / 2)
+ '%) scale(' + (percentComplete * 0.01) + ', 1);');
+
+                //set percentage
+                $("#drop-request-percent-complete .progress-label:first").text(percentComplete.toFixed(2)
+ '%');
+            }
+            else {
+                var progressBar = $('#drop-request-percent-complete');
+
+                (nfNgBridge.injector.get('$compile')($('<md-progress-linear ng-cloak ng-value="'
+ percentComplete + '" class="md-hue-2" md-mode="determinate" aria-label="Drop request percent
complete"></md-progress-linear>'))(nfNgBridge.rootScope)).appendTo(progressBar);
+
+                progressBar.append($('<div class="progress-label"></div>').text(percentComplete
+ '%'));
+            }
+        };
+
+        // process the drop request
+        var refreshProgressBar = function () {
+            // update the completed percentage
+            var percentCompleted = 0;
+            var droppedFlowfiles = 0;
+            var totalFlowfilesToDrop = 0;
+
+            dropRequests.forEach(function (dropRequest) {
+                if( nfCommon.isDefinedAndNotNull(dropRequest) &&
+                    nfCommon.isDefinedAndNotNull(dropRequest.droppedCount) &&
+                    nfCommon.isDefinedAndNotNull(dropRequest.originalCount) ) {
+                    droppedFlowfiles += dropRequest.droppedCount;
+                    totalFlowfilesToDrop += dropRequest.originalCount;
+                }
+            });
+
+            if(totalFlowfilesToDrop !== 0) {
+                percentCompleted = (droppedFlowfiles / totalFlowfilesToDrop) * 100;
+            }
+
+            setProgressBar(percentCompleted);
+
+            // check if can keep on refreshing the progress bar
+            if (finished !== true && cancelled !== true) {
+                // wait delay to refresh again
+                progressBarRefreshTimer = setTimeout(function () {
+                    // clear the progressBarRefreshTimer timer
+                    progressBarRefreshTimer = null;
+
+                    // schedule to poll the status again in nextDelay
+                    refreshProgressBar(progressBarRefreshingDelay);
+                }, progressBarRefreshingDelay);
+            }
+        };
+
+        var finalizeDropRequest = function (i) {
+            // tell the server to delete the drop request
+            if (nfCommon.isDefinedAndNotNull(dropRequests[i])) {
+                $.ajax({
+                    type: 'DELETE',
+                    url: dropRequests[i].uri,
+                    dataType: 'json'
+                }).done(function (response) {
+                    // report the results of this drop request
+                    dropRequests[i] = response.dropRequest;
+                }).always(function () {
+                    // reload the connection status from the server and refreshes the connection
UI
+                    nfConnection.reloadStatus(connections[i].id);
+                    // resolve this request
+                    singleEmptyQueuePromises[i].resolve(createSuccessResponse());
+                });
+            }
+        };
+
+        // schedule for the next poll iteration
+        var getDropRequestStatus = function (i) {
+            $.ajax({
+                type: 'GET',
+                url: dropRequests[i].uri,
+                dataType: 'json'
+            }).done(function (response) {
+                dropRequests[i] = response.dropRequest;
+                processDropRequestResponse(i);
+            }).fail(function (xhr, status, error) {
+                singleEmptyQueuePromises[i].resolve(createFailedResponse(xhr,status,error));
+            });
+        };
+
+        // process the drop request
+        var processDropRequestResponse = function (i) {
+            // close the dialog if
+            if (dropRequests[i].finished === true || cancelled === true) {
+                finalizeDropRequest(i);
+            } else {
+                // wait delay to poll again
+                dropRequestTimers[i] = setTimeout(function () {
+                    // clear the drop request timer
+                    dropRequestTimers[i] = null;
+                    // schedule to poll the status again in nextDelay
+                    getDropRequestStatus(i);
+                }, progressBarRefreshingDelay);
+            }
+        };
+
+        // empty a single queue and return a deferred representing the emptying process status
+        var emptyQueueAsync = function (i) {
+            var deferred = $.Deferred();
+
+            // issue the request to delete the flow files
+            $.ajax({
+                type: 'POST',
+                url: '../nifi-api/flowfile-queues/' + encodeURIComponent(connections[i].id)
+ '/drop-requests',
+                dataType: 'json',
+                contentType: 'application/json'
+            }).done(function (response) {
+                // process the drop request
+                dropRequests[i] = response.dropRequest;
+                processDropRequestResponse(i);
+            }).fail(function (xhr, status, error) {
+                deferred.resolve(createFailedResponse(xhr,status,error));
+            });
+
+            return deferred;
+        };
+
+        //start the drop requests
+        connections.forEach(function (connection,i) {
+            dropRequests[i] = null;
+            dropRequestTimers[i] = null;
+            singleEmptyQueuePromises.push(emptyQueueAsync(i));
+        });
+
+        // initialize the progress bar value and auto refresh it each progressBarRefreshingDelay
milliseconds
+        refreshProgressBar();
+
+        // update the button model and header of the drop request status dialog and show
it
+        $('#drop-request-status-dialog')
+            .modal('setButtonModel', [{
+                buttonText: 'Stop',
+                color: {
+                    base: '#728E9B',
+                    hover: '#004849',
+                    text: '#ffffff'
+                },
+                handler: {
+                    click: function () {
+                        //tell the singleEmptyQueue async jobs that the user cancelled the
operation and thus they need to terminate
+                        cancelled = true;
+
+                        // progressBarRefreshTimer !== null means there is a timeout in progress
on the refreshProgressBar method
+                        if (progressBarRefreshTimer !== null) {
+                            // cancel it
+                            clearTimeout(progressBarRefreshTimer);
+                        }
+                    }
+                }}]
+            )
+            .modal('setHeaderText', actionName)
+            .modal('show');
+
+        $.when.apply($,singleEmptyQueuePromises).then(function () {
+            var responses = arguments;
+            finished = true;
+
+            if (progressBarRefreshTimer !== null) {
+                // remove the timeout from the refreshProgressBar method if present
+                clearTimeout(progressBarRefreshTimer);
+            }
+
+            //hide the status dialog
+            $('#drop-request-status-dialog').modal('hide');
+
+            var droppedFlowfiles = 0;
+            var droppedFlowfilesSize = 0;
+            var errorMessages = "";
+
+            if(nfCommon.isDefinedAndNotNull(errors)) {
+                errors.forEach(function (message) {
+                    errorMessages += '<p>ProcessGroupID ' + message.processGroupId
+ ': <span style="color: red">' +  nfCommon.escapeHtml(message.errorMessage) + '</span></p>';
+                });
+            }
+
+            //check for 403 error
+            for(var i = 0; i < responses.length; i++) {
+                var response = responses[i];
+                if(response.success === false && response.xhr.status === 403) {
+                    errorMessages += '<p>QueueID ' + connections[i].id + ': <span
style="color: red">' + response.xhr.status + ' - ' + nfCommon.escapeHtml(response.xhr.responseText)
+ '</span></p>';
+                }
+            }
+
+            dropRequests.forEach(function (dropRequest) {
+                if(nfCommon.isDefinedAndNotNull(dropRequest)) {
+                    // build the results
+                    droppedFlowfiles += dropRequest.droppedCount;
+                    droppedFlowfilesSize += dropRequest.droppedSize;
+
+                    // if this request failed show the error
+                    if (nfCommon.isDefinedAndNotNull(dropRequest.failureReason)) {
+                        errorMessages += '<p>QueueID ' + dropRequest.id + ': <span
style="color: red">' + dropRequest.failureReason + '</span></p>';
+                    }
+                }
+            });
+
+            var results = $('<div></div>');
+
+            if(droppedFlowfiles !== 0) {
+                $('<span class="label"></span>').text(droppedFlowfiles).appendTo(results);
+                $('<span></span>').text(' FlowFiles (' + nfCommon.toReadableBytes(droppedFlowfilesSize)
+ ')').appendTo(results);
+                $('<span></span>').text(' were removed from the ' + (connections.length
> 1 ? 'queues.' : 'queue.' )).appendTo(results);
+            }
+            else {
+                results.text('No FlowFiles were removed.');
+            }
+
+            if(errorMessages !== "") {
+                $('<br/><br/><p style="color: darkred">Failed Drop Requests:</p><br/>').appendTo(results);
+                $('<div style="color: darkred"></div>').html(errorMessages).appendTo(results);
+            }
+
+            // display the results
+            nfDialog.showOkDialog({
+                headerText: actionName,
+                dialogContent: results,
+                okHandler: function () {
+                    nfCanvasUtils.reload();
+                }
+            });
+        });
+    };
+
+    /**
+     * Return the connections belonging to the specified process group.
+     *
+     * @param {string} processGroupId
+     * @param {boolean} recursive
+     */
+    var getProcessGroupConnections = function (processGroupId, recursive) {
+        var deferredResponse = $.Deferred();
+        var deferredConnectionsResponse = $.Deferred();
+        var deferredProcessGroupsResponse = $.Deferred();
+
+        // get connections
+        $.ajax({
+            type: 'GET',
+            url: '../nifi-api/process-groups/' + encodeURIComponent(processGroupId) + '/connections',
+            dataType: 'json'
+        }).done(function (response) {
+            deferredConnectionsResponse.resolve({
+                success: true,
+                response: response.connections
+            });
+        }).fail(function (xhr, status, error) {
+            deferredConnectionsResponse.resolve({
+                success: false,
+                xhr: xhr,
+                status: status,
+                error: error
+            });
+        });
+
+        // get process groups if recursive
+        if(recursive) {
 
 Review comment:
   ```suggestion
           if (recursive) {
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message