From issues-return-84537-archive-asf-public=cust-asf.ponee.io@nifi.apache.org Mon Sep 9 13:30:45 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0345418065C for ; Mon, 9 Sep 2019 15:30:43 +0200 (CEST) Received: (qmail 4532 invoked by uid 500); 9 Sep 2019 13:30:41 -0000 Mailing-List: contact issues-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list issues@nifi.apache.org Received: (qmail 4132 invoked by uid 99); 9 Sep 2019 13:30:40 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Sep 2019 13:30:40 +0000 From: GitBox To: issues@nifi.apache.org Subject: [GitHub] [nifi] adarmiento commented on a change in pull request #3700: NIFI-6638: Empty multiple queues at once at different flow levels Message-ID: <156803584063.21033.6445647591396022406.gitbox@gitbox.apache.org> Date: Mon, 09 Sep 2019 13:30:40 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit adarmiento commented on a change in pull request #3700: NIFI-6638: Empty multiple queues at once at different flow levels URL: https://github.com/apache/nifi/pull/3700#discussion_r322239861 ########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js ########## @@ -212,6 +212,417 @@ }).size() > 0; }; + /** + * Empty all the queues inside the array of connections. + * + * @Param {string} actionName + * @param {Array} connections + * @Param {Array} errors + * + */ + var emptyQueues = function (actionName, connections, errors) { + var cancelled = false; + var finished = false; + var progressBarRefreshingDelay = 100; //millis + + var dropRequests = []; + var dropRequestTimers = []; + var progressBarRefreshTimer = null; + var singleEmptyQueuePromises = []; + + var createFailedResponse = function (xhr, status, error) { + return { + success: false, + xhr: xhr, + status: status, + error: error + }; + }; + + var createSuccessResponse = function () { + return { + success: true + }; + }; + + // set the progress bar to a certain percentage + var setProgressBar = function (percentComplete) { + if($("#drop-request-percent-complete .progress-label").length) { + //set the request status text + $('#drop-request-status-message').text(''); + + //set progress bar + $('#drop-request-percent-complete .md-hue-2 .md-container.md-mode-determinate .md-bar.md-bar2:first') + .attr('style','transform: translateX(' + ((percentComplete - 100) / 2) + '%) scale(' + (percentComplete * 0.01) + ', 1);'); + + //set percentage + $("#drop-request-percent-complete .progress-label:first").text(percentComplete.toFixed(2) + '%'); + } + else { + var progressBar = $('#drop-request-percent-complete'); + + (nfNgBridge.injector.get('$compile')($(''))(nfNgBridge.rootScope)).appendTo(progressBar); + + progressBar.append($('
').text(percentComplete + '%')); + } + }; + + // process the drop request + var refreshProgressBar = function () { + // update the completed percentage + var percentCompleted = 0; + var droppedFlowfiles = 0; + var totalFlowfilesToDrop = 0; + + dropRequests.forEach(function (dropRequest) { + if( nfCommon.isDefinedAndNotNull(dropRequest) && + nfCommon.isDefinedAndNotNull(dropRequest.droppedCount) && + nfCommon.isDefinedAndNotNull(dropRequest.originalCount) ) { + droppedFlowfiles += dropRequest.droppedCount; + totalFlowfilesToDrop += dropRequest.originalCount; + } + }); + + if(totalFlowfilesToDrop !== 0) { + percentCompleted = (droppedFlowfiles / totalFlowfilesToDrop) * 100; + } + + setProgressBar(percentCompleted); + + // check if can keep on refreshing the progress bar + if (finished !== true && cancelled !== true) { + // wait delay to refresh again + progressBarRefreshTimer = setTimeout(function () { + // clear the progressBarRefreshTimer timer + progressBarRefreshTimer = null; + + // schedule to poll the status again in nextDelay + refreshProgressBar(progressBarRefreshingDelay); + }, progressBarRefreshingDelay); + } + }; + + var finalizeDropRequest = function (i) { + // tell the server to delete the drop request + if (nfCommon.isDefinedAndNotNull(dropRequests[i])) { + $.ajax({ + type: 'DELETE', + url: dropRequests[i].uri, + dataType: 'json' + }).done(function (response) { + // report the results of this drop request + dropRequests[i] = response.dropRequest; + }).always(function () { + // reload the connection status from the server and refreshes the connection UI + nfConnection.reloadStatus(connections[i].id); + // resolve this request + singleEmptyQueuePromises[i].resolve(createSuccessResponse()); + }); + } + }; + + // schedule for the next poll iteration + var getDropRequestStatus = function (i) { + $.ajax({ + type: 'GET', + url: dropRequests[i].uri, + dataType: 'json' + }).done(function (response) { + dropRequests[i] = response.dropRequest; + processDropRequestResponse(i); + }).fail(function (xhr, status, error) { + singleEmptyQueuePromises[i].resolve(createFailedResponse(xhr,status,error)); + }); + }; + + // process the drop request + var processDropRequestResponse = function (i) { + // close the dialog if + if (dropRequests[i].finished === true || cancelled === true) { + finalizeDropRequest(i); + } else { + // wait delay to poll again + dropRequestTimers[i] = setTimeout(function () { + // clear the drop request timer + dropRequestTimers[i] = null; + // schedule to poll the status again in nextDelay + getDropRequestStatus(i); + }, progressBarRefreshingDelay); + } + }; + + // empty a single queue and return a deferred representing the emptying process status + var emptyQueueAsync = function (i) { + var deferred = $.Deferred(); + + // issue the request to delete the flow files + $.ajax({ + type: 'POST', + url: '../nifi-api/flowfile-queues/' + encodeURIComponent(connections[i].id) + '/drop-requests', + dataType: 'json', + contentType: 'application/json' + }).done(function (response) { + // process the drop request + dropRequests[i] = response.dropRequest; + processDropRequestResponse(i); + }).fail(function (xhr, status, error) { + deferred.resolve(createFailedResponse(xhr,status,error)); + }); + + return deferred; + }; + + //start the drop requests + connections.forEach(function (connection,i) { + dropRequests[i] = null; + dropRequestTimers[i] = null; + singleEmptyQueuePromises.push(emptyQueueAsync(i)); + }); + + // initialize the progress bar value and auto refresh it each progressBarRefreshingDelay milliseconds + refreshProgressBar(); + + // update the button model and header of the drop request status dialog and show it + $('#drop-request-status-dialog') + .modal('setButtonModel', [{ + buttonText: 'Stop', + color: { + base: '#728E9B', + hover: '#004849', + text: '#ffffff' + }, + handler: { + click: function () { + //tell the singleEmptyQueue async jobs that the user cancelled the operation and thus they need to terminate + cancelled = true; + + // progressBarRefreshTimer !== null means there is a timeout in progress on the refreshProgressBar method + if (progressBarRefreshTimer !== null) { + // cancel it + clearTimeout(progressBarRefreshTimer); + } + } + }}] + ) + .modal('setHeaderText', actionName) + .modal('show'); + + $.when.apply($,singleEmptyQueuePromises).then(function () { + var responses = arguments; + finished = true; + + if (progressBarRefreshTimer !== null) { + // remove the timeout from the refreshProgressBar method if present + clearTimeout(progressBarRefreshTimer); + } + + //hide the status dialog + $('#drop-request-status-dialog').modal('hide'); + + var droppedFlowfiles = 0; + var droppedFlowfilesSize = 0; + var errorMessages = ""; + + if(nfCommon.isDefinedAndNotNull(errors)) { + errors.forEach(function (message) { + errorMessages += '

ProcessGroupID ' + message.processGroupId + ': ' + nfCommon.escapeHtml(message.errorMessage) + '

'; + }); + } + + //check for 403 error + for(var i = 0; i < responses.length; i++) { + var response = responses[i]; + if(response.success === false && response.xhr.status === 403) { + errorMessages += '

QueueID ' + connections[i].id + ': ' + response.xhr.status + ' - ' + nfCommon.escapeHtml(response.xhr.responseText) + '

'; + } + } + + dropRequests.forEach(function (dropRequest) { + if(nfCommon.isDefinedAndNotNull(dropRequest)) { + // build the results + droppedFlowfiles += dropRequest.droppedCount; + droppedFlowfilesSize += dropRequest.droppedSize; + + // if this request failed show the error + if (nfCommon.isDefinedAndNotNull(dropRequest.failureReason)) { + errorMessages += '

QueueID ' + dropRequest.id + ': ' + dropRequest.failureReason + '

'; + } + } + }); + + var results = $('
'); + + if(droppedFlowfiles !== 0) { + $('').text(droppedFlowfiles).appendTo(results); + $('').text(' FlowFiles (' + nfCommon.toReadableBytes(droppedFlowfilesSize) + ')').appendTo(results); + $('').text(' were removed from the ' + (connections.length > 1 ? 'queues.' : 'queue.' )).appendTo(results); + } + else { + results.text('No FlowFiles were removed.'); + } + + if(errorMessages !== "") { + $('

Failed Drop Requests:


').appendTo(results); + $('
').html(errorMessages).appendTo(results); + } + + // display the results + nfDialog.showOkDialog({ + headerText: actionName, + dialogContent: results, + okHandler: function () { + nfCanvasUtils.reload(); + } + }); + }); + }; + + /** + * Return the connections belonging to the specified process group. + * + * @param {string} processGroupId + * @param {boolean} recursive + */ + var getProcessGroupConnections = function (processGroupId, recursive) { + var deferredResponse = $.Deferred(); + var deferredConnectionsResponse = $.Deferred(); + var deferredProcessGroupsResponse = $.Deferred(); + + // get connections + $.ajax({ + type: 'GET', + url: '../nifi-api/process-groups/' + encodeURIComponent(processGroupId) + '/connections', + dataType: 'json' + }).done(function (response) { + deferredConnectionsResponse.resolve({ + success: true, + response: response.connections + }); + }).fail(function (xhr, status, error) { + deferredConnectionsResponse.resolve({ + success: false, + xhr: xhr, + status: status, + error: error + }); + }); + + // get process groups if recursive + if(recursive) { + $.ajax({ + type: 'GET', + url: '../nifi-api/process-groups/' + encodeURIComponent(processGroupId) + '/process-groups', + dataType: 'json' + }).done(function (response) { + deferredProcessGroupsResponse.resolve({ + success: true, + response: response.processGroups + }); + }).fail(function (xhr, status, error) { + deferredProcessGroupsResponse.resolve({ + success: false, + xhr: xhr, + status: status, + error: error + }); + }); + } + else { + deferredProcessGroupsResponse.resolve(); + } + + $.when(deferredConnectionsResponse, deferredProcessGroupsResponse) + .done(function (connectionsResponse, processGroupsResponse) { + var response = { + connections: [], + errorMessages: [] + }; + + if(connectionsResponse.success) { + response.connections = connectionsResponse.response; + } + else { + response.errorMessages.push({ + processGroupId: processGroupId, + errorMessage: connectionsResponse.xhr.status + ' - Unable to get queues. ' + connectionsResponse.xhr.responseText + }); + } + + if(!recursive) { + deferredResponse.resolve(response); + } + else { + if(!processGroupsResponse.success) { + response.errorMessages.push({ + processGroupId: processGroupId, + errorMessage: processGroupsResponse.xhr.status + ' - Unable to get process groups. ' + processGroupsResponse.xhr.responseText + }); + deferredResponse.resolve(response); + } + else { + var requests = processGroupsResponse.response.map(function (processGroup) { + return getProcessGroupConnections(processGroup.id,true); + }); + + $.when.apply($,requests).then(function () { + var responses = arguments; + + for(var i = 0; i < responses.length; i++) { + responses[i].connections.forEach(function (connection) { + response.connections.push(connection); + }); + + responses[i].errorMessages.forEach(function (errorMessage) { + response.errorMessages.push(errorMessage); + }); + } + + deferredResponse.resolve(response); + }); + } + } + }); + + return deferredResponse; + }; + + /** + * Return the connections belonging to the specified process groups. + * + * @param {Array} processGroupIDs + * @param {boolean} recursive + */ + var getProcessGroupsConnections = function (processGroupIDs, recursive) { + var deferredResponse = $.Deferred(); + + var deferredResponses = processGroupIDs.map(function (processGroupId) { + return getProcessGroupConnections(processGroupId,recursive); + }); + + $.when.apply($,deferredResponses).then(function () { + var responses = arguments; + + var response = { + connections: [], + errorMessages: [] + }; + + for(var i = 0; i < responses.length; i++) { Review comment: ```suggestion for (var i = 0; i < responses.length; i++) { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services