nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Thomsen <mikerthom...@gmail.com>
Subject Re: question about Nifi Custom processor
Date Wed, 24 Jul 2019 18:28:43 GMT
Two problems I see right here are this:

flowfile = session.create();
    flowfile = session.get();


The second call will replace the newly created flowfile that comes from
session.create() with either null or the next element in the queue. That
will result in you having a flowfile created in the session that you cannot
either transfer to a downstream relationship with session.transfer() or
remove with session.remove(). End result there will be the session will
fail to commit because it doesn't know what it should do with that empty
flowfile.

Second:

new FileReader("JSON_PATH")


That will result in java.io.FileReader looking for a file named "JSON_PATH"
in the current working folder. Unless that is your intent, you need to
remove the quotes and make sure it's pointing to a real path on the file
system.

On Tue, Jul 23, 2019 at 10:13 PM Farooq Mustafa <farooq.mustafa@gmail.com>
wrote:

> Hello,
>
>
> I am writing my first custom processor. I followed the example here
> https://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/
>
>
> In the settings, I have /tmp/jsoniput/sample1.json file
>
>
> The source code;
>
> public void onTrigger(final ProcessContext context, final ProcessSession session) throws
ProcessException {
>
>   final AtomicReference<String> value = new AtomicReference<>();
>   getLogger().warn("sample - Farooq - 1 , value = <" + value.toString() + ">");
>
>   FlowFile flowfile = session.get();
>   getLogger().warn("sample - Farooq - 2");
>
>   if ( null == flowfile) {
>     getLogger().warn("sample - Farooq - 2.1 - flowfile is NULL *** ");
>     flowfile = session.create();
>     flowfile = session.get();
>     getLogger().warn("sample - Farooq - 2.2 - session.create() / session.get() --- NOT
SURE ");
>   }
>   getLogger().warn("sample - Farooq - 2.3 JSON_PATH :" + context.getProperty(JSON_PATH).getValue());
>
>   session.read(flowfile, new InputStreamCallback() {
>     @Override
>     public void process(InputStream in) throws IOException {
>
>       try {
>         getLogger().warn("sample - Farooq - 3 JSON_PATH :" + context.getProperty(JSON_PATH).getValue());
>         Object obj = new JSONParser().parse(new FileReader("JSON_PATH"));
>         getLogger().warn("sample - Farooq - 3.1 ");
>
>
>
> The log output:
>
> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
> o.a.nifi.processors.sitc.JsonProcessor
> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 1
> , value = <null>
>
> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
> o.a.nifi.processors.sitc.JsonProcessor
> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 2
>
> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
> o.a.nifi.processors.sitc.JsonProcessor
> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq -
> 2.1 - flowfile is NULL ***
>
> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
> o.a.nifi.processors.sitc.JsonProcessor
> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq -
> 2.2 - session.create() / session.get() --- NOT SURE
>
> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
> o.a.nifi.processors.sitc.JsonProcessor
> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq -
> 2.3 JSON_PATH :/tmp/jsoninput
>
> 2019-07-20 13:58:42,626 ERROR [Timer-Driven Process Thread-4]
> o.a.nifi.processors.sitc.JsonProcessor
> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267]
> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] failed to process
> session due to java.lang.NullPointerException; Processor Administratively
> Yielded for 1 sec: java.lang.NullPointerException
>
> java.lang.NullPointerException: null
>
> at
> org.apache.nifi.controller.repository.StandardProcessSession.getRecord(StandardProcessSession.java:574)
>
> at
> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3132)
>
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2187)
>
>
> And the basic flow:
> [image: image.png]
>
>
>
> Any ideas or guidance will be appreciated.
>
>
> Thanks
>
> Farooq
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message