nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor
Date Mon, 12 Aug 2019 17:51:01 GMT
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited
processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r313048735
 
 

 ##########
 File path: nanofi/src/api/ecu.c
 ##########
 @@ -0,0 +1,494 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(char * uuid) {
+
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        free_flow_file_list(&pp->ff_list);
+        free(pp->uuid_str);
+        HASH_DEL(procparams, pp);
+        free(pp);
+    }
+}
+
+void signal_handler(int signum) {
+    if (signum == SIGINT || signum == SIGTERM) {
+        stopped = 1;
+    }
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+    if (args && *args) {
+        input_params->file = args[1];
+        input_params->interval = args[2];
+        input_params->instance = args[4];
+        input_params->tcp_port = args[5];
+        input_params->nifi_port_uuid = args[6];
+    }
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+    tailfile_input_params input_params;
+    memset(&input_params, 0, sizeof(input_params));
+    init_common_input(&input_params, args);
+    input_params.delimiter = args[3];
+    return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+    tailfile_input_params input_params;
+    memset(&input_params, 0, sizeof(input_params));
+    init_common_input(&input_params, args);
+    input_params.chunk_size = args[3];
+    return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num)
{
+    if (access(params->file, F_OK) == -1) {
+        printf("Error: %s doesn't exist!\n", params->file);
+        return -1;
+    }
+
+    struct stat stats;
+    int ret = stat(params->file, &stats);
+
+    if (ret == -1) {
+        printf("Error occurred while getting file status {file: %s, error: %s}\n", params->file,
strerror(errno));
+        return -1;
+    }
+    // Check for file existence
+    if (S_ISDIR(stats.st_mode)){
+        printf("Error: %s is a directory!\n", params->file);
+        return -1;
+    }
+
+    errno = 0;
+    *intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+    if (errno != 0) {
+        printf("Invalid interval value specified\n");
+        return -1;
+    }
+
+    errno = 0;
+    *port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+    if (errno != 0) {
+        printf("Cannot convert tcp port to numeric value\n");
+        return -1;
+    }
+    return 0;
+}
+
+void setup_signal_action() {
+    struct sigaction action;
+    memset(&action, 0, sizeof(sigaction));
+    action.sa_handler = signal_handler;
+    sigaction(SIGTERM, &action, NULL);
+    sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char *
processor_name, void(*callback)(processor_session *, processor_context *)) {
+    nifi_proc_params params;
+    nifi_port port;
+    port.port_id = input_params->nifi_port_uuid;
+
+    nifi_instance * instance = create_instance(input_params->instance, &port);
+    add_custom_processor(processor_name, callback);
+    standalone_processor * proc = create_processor(processor_name, instance);
+    params.instance = instance;
+    params.processor = proc;
+    return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp == NULL) {
+        pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+        memset(pp, 0, sizeof(struct processor_params));
+        char * uuid_str = (char *)malloc((strlen(uuid) + 1) * sizeof(char));
+        strcpy(uuid_str, uuid);
+        pp->uuid_str = uuid_str;
+        HASH_ADD_KEYPTR(hh, procparams, pp->uuid_str, strlen(pp->uuid_str), pp);
+    }
+    add_flow_file_record(&pp->ff_list, ffr);
+    pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        struct flow_file_list * head = pp->ff_list;
+        while (head) {
+            struct flow_file_list * tmp = head;
+            free_flowfile(tmp->ff_record);
+            head = head->next;
+            free(tmp);
+        }
+        pp->ff_list = head;
+    }
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        struct flow_file_list * head = pp->ff_list;
+        while (head) {
+            struct flow_file_list * tmp = head;
+            if (tmp->complete) {
+                free_flowfile(tmp->ff_record);
+                head = head->next;
+                free(tmp);
+            }
+            else {
+                break;
+            }
+        }
+        pp->ff_list = head;
+    }
+}
+
+uint64_t get_current_offset(char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        return pp->curr_offset;
+    }
+    return 0;
 
 Review comment:
   In subsequent calls, the processor is found in the hashtable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message