nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Russell Bateman <r...@windofkeltia.com>
Subject Re: Reading the incoming flowfile "twice"
Date Fri, 27 Mar 2020 21:22:23 GMT
Joe,

Ah, thanks. I think I have learned a lot about what's going on down 
inside session.read/write()today. I don't have to stand on my head. For 
completeness if anyone else looks for this answer, here's my code amended:

public void onTrigger( final ProcessContext context, final ProcessSession session ) throws
ProcessException
{
   FlowFile flowfile = session.get();
   ...

   // Do some processing on the in-coming flowfile then close its input stream
   flowfile = session.write( flowfile, new InputStreamCallback()
   {
     @Override
     public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
     {
       ...

       // processing puts some output on the output stream...
       outputStream.write( etc. );

       inputStream.close();
     }
   } );

   // Start over doing different processing on the (same/reopened) in-coming flowfile
   // continuing to use the (same, also reopened, but appended to) output stream.
   flowfile = session.write( flowfile, new StreamCallback()
   {
     @Override
     public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
     {
       ...

       // processing puts (some more) output on the flowfile's output stream...
       outputStream.write( etc. );
     }
   } );

   session.transfer( flowfile, etc. );
}

As I'm fond of saying, NiFi just rocks because there's always a solution!

Russ

On 3/27/20 3:08 PM, Joe Witt wrote:
> you should be able to call write as many times as you need.  just keep
> using the resulting flowfile reference into the next call.
>
> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com>
> wrote:
>
>> Mike,
>>
>> Many thanks for responding. Do you mean to say that all I have to do is
>> something like this?
>>
>>      public void onTrigger( final ProcessContext context, final
>>      ProcessSession session ) throws ProcessException
>>      {
>>         FlowFile flowfile = session.get();
>>         ...
>>
>>         // this is will be our resulting flowfile...
>>         AtomicReference< OutputStream > savedOutputStream = new
>>      AtomicReference<>();
>>
>>         /* Do some processing on the in-coming flowfile then close its
>>      input stream, but
>>          * save the output stream for continued use.
>>          */
>>      *  session.write( flowfile, new InputStreamCallback()*
>>         {
>>           @Override
>>      *    public void process( InputStream inputStream, OutputStream
>>      outputStream ) throws IOException*
>>           {
>>             savedOutputStream.set( outputStream );
>>             ...
>>
>>             // processing puts some output on the output stream...
>>             outputStream.write( etc. );
>>
>>             inputStream.close();
>>           }
>>      *  } );*
>>
>>         /* Start over doing different processing on the (same/reopened)
>>      in-coming flowfile
>>          * continuing to use the original output stream. It's our
>>      responsibility to close
>>          * the saved output stream, NiFi closes the unused output stream
>>      opened, but
>>          * ignored by us.
>>          */
>>      *  session.write( flowfile, new StreamCallback()*
>>         {
>>           @Override
>>      *    public void process( InputStream inputStream, OutputStream
>>      outputStream ) throws IOException*
>>           {
>>             outputStream = savedOutputStream.get(); // (discard the new
>>      output stream)
>>             ...
>>
>>             // processing puts (some more) output on the original output
>>      stream...
>>             outputStream.write( etc. );
>>
>>             outputStream.close();
>>           }
>>      *  } );*
>>
>>         session.transfer( flowfile, etc. );
>>      }
>>
>> I'm wondering if this will work to "discard" the new output stream
>> opened for me (the second time) and replace it with the original one
>> which was probably closed when the first call to
>> session.write()finished. What's on these streams is way too big for me
>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>
>> Russ
>>
>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>> session.read(FlowFile) just gives you an InputStream. You should be able
>> to
>>> rerun that as many times as you want provided you properly close it.
>>>
>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <russ@windofkeltia.com>
>>> wrote:
>>>
>>>> In my custom processor, I'm using a SAX parser to process an incoming
>>>> flowfile that's in XML. Except that, this particular XML is in essence
>>>> two different files and I would like to split, read and process the
>>>> first "half", which starts a couple of lines (XML elements) into the
>>>> file) not using the SAX parser. At the end, I would stream the output of
>>>> the first half, then the SAX-processed second half.
>>>>
>>>> So, in short:
>>>>
>>>>    1. process the incoming flowfile for the early content not using SAX,
>>>>       but merely copying as-is; at all cost I must avoid "reassembling"
>>>>       the first half using my SAX handler (what I'm doing now),
>>>>    2. output the first part down the output stream to the resulting
>> flowfile,
>>>>    3. (re)process the incoming flowfile using SAX (and I can just skip
>>>>       over the first bit) and spitting the result of this second part out
>>>>       down the output stream of the resulting flowfile.
>>>>
>>>> I guess this is tantamount to asking how, in Java, I can read an input
>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>> developer question and more a Java question. I have looked at it that
>>>> way too, but, if one of you knows (particularly NiFi) best practice, I
>>>> would very much like to hear about it.
>>>>
>>>> Thanks.
>>>>
>>>>
>>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message