nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor
Date Tue, 13 Aug 2019 10:32:37 GMT
bakaid commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited
processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r313296562
 
 

 ##########
 File path: nanofi/src/api/ecu.c
 ##########
 @@ -0,0 +1,493 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(const char * uuid) {
+
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        free_flow_file_list(&pp->ff_list);
+        HASH_DEL(procparams, pp);
+        free(pp);
+    }
+}
+
+void signal_handler(int signum) {
+    if (signum == SIGINT || signum == SIGTERM) {
+        stopped = 1;
+    }
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+    if (args && *args) {
+        input_params->file = args[1];
+        input_params->interval = args[2];
+        input_params->instance = args[4];
+        input_params->tcp_port = args[5];
+        input_params->nifi_port_uuid = args[6];
+    }
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+    tailfile_input_params input_params;
+    memset(&input_params, 0, sizeof(input_params));
+    init_common_input(&input_params, args);
+    input_params.delimiter = args[3];
+    return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+    tailfile_input_params input_params;
+    memset(&input_params, 0, sizeof(input_params));
+    init_common_input(&input_params, args);
+    input_params.chunk_size = args[3];
+    return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num)
{
+    if (access(params->file, F_OK) == -1) {
+        printf("Error: %s doesn't exist!\n", params->file);
+        return -1;
+    }
+
+    struct stat stats;
+    int ret = stat(params->file, &stats);
+
+    if (ret == -1) {
+        printf("Error occurred while getting file status {file: %s, error: %s}\n", params->file,
strerror(errno));
+        return -1;
+    }
+    // Check for file existence
+    if (S_ISDIR(stats.st_mode)){
+        printf("Error: %s is a directory!\n", params->file);
+        return -1;
+    }
+
+    errno = 0;
+    *intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+    if (errno != 0) {
+        printf("Invalid interval value specified\n");
+        return -1;
+    }
+
+    errno = 0;
+    *port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+    if (errno != 0) {
+        printf("Cannot convert tcp port to numeric value\n");
+        return -1;
+    }
+    return 0;
+}
+
+void setup_signal_action() {
+    struct sigaction action;
+    memset(&action, 0, sizeof(sigaction));
+    action.sa_handler = signal_handler;
+    sigaction(SIGTERM, &action, NULL);
+    sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char *
processor_name, void(*callback)(processor_session *, processor_context *)) {
+    nifi_proc_params params;
+    nifi_port port;
+    port.port_id = input_params->nifi_port_uuid;
+
+    nifi_instance * instance = create_instance(input_params->instance, &port);
+    add_custom_processor(processor_name, callback);
+    standalone_processor * proc = create_processor(processor_name, instance);
+    params.instance = instance;
+    params.processor = proc;
+    return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp == NULL) {
+        pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+        memset(pp, 0, sizeof(struct processor_params));
+        strcpy(pp->uuid_str, uuid);
+        HASH_ADD_STR(procparams, uuid_str, pp);
+    }
+
+    add_flow_file_record(&pp->ff_list, ffr);
+    pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        struct flow_file_list * head = pp->ff_list;
+        while (head) {
+            struct flow_file_list * tmp = head;
+            free_flowfile(tmp->ff_record);
+            head = head->next;
+            free(tmp);
+        }
+        pp->ff_list = head;
+    }
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        struct flow_file_list * head = pp->ff_list;
+        while (head) {
+            struct flow_file_list * tmp = head;
+            if (tmp->complete) {
+                free_flowfile(tmp->ff_record);
+                head = head->next;
+                free(tmp);
+            }
+            else {
+                break;
+            }
+        }
+        pp->ff_list = head;
+    }
+}
+
+uint64_t get_current_offset(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    if (pp) {
+        return pp->curr_offset;
+    }
+    return 0;
+}
+
+processor_params * get_proc_params(const char * uuid) {
+    struct processor_params * pp = NULL;
+    HASH_FIND_STR(procparams, uuid, pp);
+    return pp;
+}
+
+void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ffl) {
+    struct processor_params * pp = get_proc_params(uuid);
+    if (!pp) {
+        pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+        memset(pp, 0, sizeof(struct processor_params));
+        pp->ff_list = ffl;
+        pp->curr_offset = value;
+        strcpy(pp->uuid_str, uuid);
+        HASH_ADD_STR(procparams, uuid_str, pp);
+        return;
+    }
+    delete_all_flow_files_from_proc(uuid);
+    pp->curr_offset += value;
+    pp->ff_list = ffl;
+}
+
+uint64_t update_curr_offset(const char * uuid, uint64_t value) {
+    struct processor_params * pp = get_proc_params(uuid);
+    if (pp) {
+        pp->curr_offset += value;
+        return pp->curr_offset;
+    }
+
+    pp = (struct processor_params *)malloc(sizeof(struct processor_params));
+    memset(pp, 0, sizeof(struct processor_params));
+    strcpy(pp->uuid_str, uuid);
+    pp->curr_offset = value;
+    HASH_ADD_STR(procparams, uuid_str, pp);
+    return pp->curr_offset;
+}
+
+void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx) {
+    char file_path[4096];
+    char chunk_size[50];
+
+    initialize_content_repo(ctx);
+    char uuid_str[37];
+    const char * uuid = get_proc_uuid_from_context(ctx);
+    printf("uuid = %s\n", uuid);
+    strcpy(uuid_str, uuid);
+
+    if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+        return;
+    }
+
+    if (get_property(ctx, "chunk_size", chunk_size, sizeof(chunk_size)) != 0) {
+        return;
+    }
+
+    errno = 0;
+    uint64_t chunk_size_value = strtoul(chunk_size, NULL, 10);
+
+    if (errno != 0) {
+        printf("Invalid chunk size specified\n");
+        return;
+    }
+
+    FILE * fp = fopen(file_path, "rb");
+
+    if (!fp) {
+        printf("Unable to open file. {file: %s, reason: %s}\n", file_path, strerror(errno));
+        return;
+    }
+
+    char * buff = (char *)malloc((chunk_size_value +1 ) * sizeof(char));
+    size_t bytes_read = 0;
+
+    uint64_t curr_offset = get_current_offset(uuid);
+    fseek(fp, curr_offset, SEEK_SET);
+    while ((bytes_read = fread(buff, 1, chunk_size_value, fp)) > 0) {
+        if (bytes_read < chunk_size_value) {
+            break;
+        }
+        buff[chunk_size_value] = '\0';
+        flow_file_record * ffr = write_to_flow(buff, strlen(buff), ctx);
+        curr_offset = ftell(fp);
+        add_attributes(ffr, file_path, curr_offset);
+        add_to_hash_table(ffr, curr_offset, uuid_str);
+    }
+    free(buff);
+    fclose(fp);
+}
+
+flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx)
{
+    flow_file_info ff_info;
+    memset(&ff_info, 0, sizeof(ff_info));
+
+    if (!file_path) {
+        return ff_info;
+    }
+
+    initialize_content_repo(ctx);
+
+    char uuid_str[37];
+    const char * uuid = get_proc_uuid_from_context(ctx);
+    printf("uuid = %s\n", uuid);
+    strcpy(uuid_str, uuid);
+
+    char buff[MAX_BYTES_READ + 1];
+    errno = 0;
+    FILE * fp = fopen(file_path, "rb");
+    if (!fp) {
+        printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
+        return ff_info;
+    }
+
+    uint64_t curr_offset = get_current_offset(uuid_str);
+
+    fseek(fp, curr_offset, SEEK_SET);
+
+    flow_file_list * ffl = NULL;
+    size_t bytes_read = 0;
+    while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
+        buff[bytes_read] = '\0';
+        struct token_list tokens = tokenize_string_tailfile(buff, delim);
+        if (tokens.total_bytes > 0) {
+            ff_info.total_bytes += tokens.total_bytes;
+            curr_offset += tokens.total_bytes;
+            fseek(fp, curr_offset, SEEK_SET);
+        }
+
+        token_node * head;
+        for (head = tokens.head; head && head->data; head = head->next) {
+            flow_file_record * ffr = write_to_flow(head->data, strlen(head->data),
ctx);
+            add_attributes(ffr, file_path, curr_offset);
+            add_flow_file_record(&ffl, ffr);
+        }
+        free_all_tokens(&tokens);
+    }
+    fclose(fp);
+    ff_info.ff_list = ffl;
+    return ff_info;
+}
+
+struct properties {
+    char * file_path;
+    char delimiter;
+};
+
+struct properties get_properties(processor_context * ctx) {
+    struct properties props;
+    memset(&props, 0, sizeof(props));
+
+    char file_path[4096];
+    char delimiter[3];
+
+    if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
+        return props;
+    }
+
+    if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
+        printf("No delimiter found\n");
+        return props;
+    }
+
+    if (strlen(delimiter) == 0) {
+        printf("Delimiter not specified or it is empty\n");
+        return props;
+    }
+    char delim = delimiter[0];
+
+    if (delim == '\\') {
+          if (strlen(delimiter) > 1) {
+            switch (delimiter[1]) {
+              case 'r':
+                delim = '\r';
+                break;
+              case 't':
+                delim = '\t';
+                break;
+              case 'n':
+                delim = '\n';
+                break;
+              case '\\':
+                delim = '\\';
+                break;
+              default:
+                break;
+            }
+        }
+    }
+
+    int len = strlen(file_path);
+    props.file_path = (char *)malloc((len + 1) * sizeof(char));
+    strncpy(props.file_path, file_path, len);
+    props.file_path[len] = '\0';
+    props.delimiter = delim;
+    return props;
+}
+
+void on_trigger_logaggregator(processor_session * ps, processor_context * ctx) {
+
+    struct properties props = get_properties(ctx);
 
 Review comment:
   Do we really need to reparse these on every on_trigger? Can these change between on_triggers?
This seems inefficient.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message