From issues-return-82426-archive-asf-public=cust-asf.ponee.io@nifi.apache.org Wed Aug 7 15:48:14 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 63F1B180643 for ; Wed, 7 Aug 2019 17:48:14 +0200 (CEST) Received: (qmail 42204 invoked by uid 500); 7 Aug 2019 15:48:13 -0000 Mailing-List: contact issues-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list issues@nifi.apache.org Received: (qmail 42195 invoked by uid 99); 7 Aug 2019 15:48:13 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Aug 2019 15:48:13 +0000 From: GitBox To: issues@nifi.apache.org Subject: [GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor Message-ID: <156519289360.23464.14052681687324343984.gitbox@gitbox.apache.org> Date: Wed, 07 Aug 2019 15:48:13 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit arpadboda commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r311629099 ########## File path: nanofi/src/api/ecu.c ########## @@ -0,0 +1,494 @@ +#include "api/ecu.h" +#include "api/nanofi.h" +#include "core/string_utils.h" +#include "core/cstructs.h" +#include "core/file_utils.h" +#include "core/flowfiles.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +processor_params * procparams = NULL; +volatile sig_atomic_t stopped = 0; + +void free_proc_params(char * uuid) { + + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp) { + free_flow_file_list(&pp->ff_list); + free(pp->uuid_str); + HASH_DEL(procparams, pp); + free(pp); + } +} + +void signal_handler(int signum) { + if (signum == SIGINT || signum == SIGTERM) { + stopped = 1; + } +} + +void init_common_input(tailfile_input_params * input_params, char ** args) { + if (args && *args) { + input_params->file = args[1]; + input_params->interval = args[2]; + input_params->instance = args[4]; + input_params->tcp_port = args[5]; + input_params->nifi_port_uuid = args[6]; + } +} + +tailfile_input_params init_logaggregate_input(char ** args) { + tailfile_input_params input_params; + memset(&input_params, 0, sizeof(input_params)); + init_common_input(&input_params, args); + input_params.delimiter = args[3]; + return input_params; +} + +tailfile_input_params init_tailfile_chunk_input(char ** args) { + tailfile_input_params input_params; + memset(&input_params, 0, sizeof(input_params)); + init_common_input(&input_params, args); + input_params.chunk_size = args[3]; + return input_params; +} + +int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num) { + if (access(params->file, F_OK) == -1) { + printf("Error: %s doesn't exist!\n", params->file); + return -1; + } + + struct stat stats; + int ret = stat(params->file, &stats); + + if (ret == -1) { + printf("Error occurred while getting file status {file: %s, error: %s}\n", params->file, strerror(errno)); + return -1; + } + // Check for file existence + if (S_ISDIR(stats.st_mode)){ + printf("Error: %s is a directory!\n", params->file); + return -1; + } + + errno = 0; + *intrvl = (uint64_t)(strtoul(params->interval, NULL, 10)); + + if (errno != 0) { + printf("Invalid interval value specified\n"); + return -1; + } + + errno = 0; + *port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10)); + if (errno != 0) { + printf("Cannot convert tcp port to numeric value\n"); + return -1; + } + return 0; +} + +void setup_signal_action() { + struct sigaction action; + memset(&action, 0, sizeof(sigaction)); + action.sa_handler = signal_handler; + sigaction(SIGTERM, &action, NULL); + sigaction(SIGINT, &action, NULL); +} + +nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *)) { + nifi_proc_params params; + nifi_port port; + port.port_id = input_params->nifi_port_uuid; + + nifi_instance * instance = create_instance(input_params->instance, &port); + add_custom_processor(processor_name, callback); + standalone_processor * proc = create_processor(processor_name, instance); + params.instance = instance; + params.processor = proc; + return params; +} + +void add_to_hash_table(flow_file_record * ffr, uint64_t offset, char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp == NULL) { + pp = (struct processor_params*)malloc(sizeof(struct processor_params)); + memset(pp, 0, sizeof(struct processor_params)); + char * uuid_str = (char *)malloc((strlen(uuid) + 1) * sizeof(char)); + strcpy(uuid_str, uuid); + pp->uuid_str = uuid_str; + HASH_ADD_KEYPTR(hh, procparams, pp->uuid_str, strlen(pp->uuid_str), pp); + } + add_flow_file_record(&pp->ff_list, ffr); + pp->curr_offset = offset; +} + +void delete_all_flow_files_from_proc(const char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp) { + struct flow_file_list * head = pp->ff_list; + while (head) { + struct flow_file_list * tmp = head; + free_flowfile(tmp->ff_record); + head = head->next; + free(tmp); + } + pp->ff_list = head; + } +} + +void delete_completed_flow_files_from_proc(const char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp) { + struct flow_file_list * head = pp->ff_list; + while (head) { + struct flow_file_list * tmp = head; + if (tmp->complete) { + free_flowfile(tmp->ff_record); + head = head->next; + free(tmp); + } + else { + break; + } + } + pp->ff_list = head; + } +} + +uint64_t get_current_offset(char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp) { + return pp->curr_offset; + } + return 0; Review comment: Isn't 0 a valid offset? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services