nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Marc Parisi (Jira)" <>
Subject [jira] [Commented] (MINIFICPP-1201) Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python Wrapper) To Do ML Inference on Edge
Date Mon, 04 May 2020 17:08:00 GMT


Marc Parisi commented on MINIFICPP-1201:

+1 Yeah totally. thanks for following up. I think in the e-mail chain it was becoming apparent
that I wasn't correctly relaying that I think H2O-3 needed to be introduced as a corrective

> Integrates MiNiFi C++ with H2O Driverless AI MOJO Scoring Pipeline (C++ Runtime Python
Wrapper) To Do ML Inference on Edge
> --------------------------------------------------------------------------------------------------------------------------
>                 Key: MINIFICPP-1201
>                 URL:
>             Project: Apache NiFi MiNiFi C++
>          Issue Type: New Feature
>    Affects Versions: master
>         Environment: Ubuntu 18.04 in AWS EC2
> MiNiFi C++ 0.7.0
>            Reporter: James Medel
>            Priority: Blocker
>             Fix For: 0.8.0
> *MiNiFi C++ and H2O Driverless AI Integration* via Custom Python Processors:
> Integrates MiNiFi C++ with H2O Driverless AI by using Driverless AI's MOJO Scoring Pipeline
(in C++ Runtime Python Wrapper) and MiNiFi's Custom Python Processor. Uses a Python Processor
to execute the MOJO Scoring Pipeline to do batch scoring or real-time scoring for one or more
predicted labels on tabular test data in the incoming flow file content. If the tabular data
is one row, then the MOJO does real-time scoring. If the tabular data is multiple rows, then
the MOJO does batch scoring. I would like to contribute my processors to MiNiFi C++ as a new
> *1 custom python processor* created for MiNiFi:
> *H2oMojoPwScoring* - Executes H2O Driverless AI's MOJO Scoring Pipeline in C++ Runtime
Python Wrapper to do batch scoring or real-time scoring on a frame of data within each incoming
flow file. Requires the user to add the *pipeline.mojo* filepath into the "MOJO Pipeline Filepath"
property. This property is used in the onTrigger(context, session) function to get the pipeline.mojo
filepath, so we can *pass it into* the *daimojo.model(pipeline_mojo_filepath)* function to
instantiate our *mojo_scorer*. MOJO creation time and uuid are added as individual flow file
attributes. Then the *flow file content* is *loaded into Datatable* *frame* to hold the test
data. Then a Python lambda function called compare is used to compare whether the datatable
frame header column names equals the expected header column names from the mojo scorer. This
check is done because the datatable frame could have a missing header, which is true when
the header does not equal the expected header and so we update the datatable frame header
with the mojo scorer's expected header. Having the correct header works nicely because the
*mojo scorer's* *predict(datatable_frame)* function needs the header and then does the prediction
returning a predictions datatable frame. The mojo scorer's predict function is *capable of
doing real-time scoring or batch scoring*, it just depends on the amount of rows that the
tabular data has. This predictions datatable frame is then converted to pandas dataframe,
so we can use pandas' to_string(index=False) function to convert the dataframe to a string
without the dataframe's index. Then *the prediction string is written to flow file content*.
A flow file attribute is added for the number of rows scored. Another one or more flow file
attributes are added for the predicted label name and its associated score. Finally, the
flow file is transferred on a success relationship.
> *Hydraulic System Condition Monitoring* Data used in MiNiFi Flow:
> The sensor test data I used in this integration comes from Kaggle: Condition Monitoring
of Hydraulic Systems. I was able to predict hydraulic system cooling efficiency through MiNiFi
and H2O integration described above. This use case here is hydraulic system predictive maintenance.

This message was sent by Atlassian Jira

View raw message