nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Thomsen <mikerthom...@gmail.com>
Subject Re: Reading the incoming flowfile "twice"
Date Sun, 29 Mar 2020 09:14:43 GMT
If these files are only a few MB at the most, you can also just export them
to a ByteArrayOutputStream. Just a thought.

On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <russ@windofkeltia.com>
wrote:

> Joe and Mike,
>
> Sadly, I was not able to get very far on this. It seems that the extend
> to which I copy the first half of the contents of the input stream, I
> lose what comes after when I try to read again, basically, the second
> half comprising the <metadata>and <demographics>elements which I was
> hoping to SAX-parse. Here's code and output. I have highlighted the
> output to make it easier to read.
>
> ? <#>
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream first time around
> (before copying to output stream)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
> |{|
> |||@Override|
> |||public| |void| |process( InputStream inputStream, OutputStream
> outputStream ) ||throws| |IOException|
> |||{|
> |||System.out.println( ||"And now, let's copy..."| |);|
> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
> outputStream );|
> |||}|
> |} );|
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream second time around
> (after copying)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |// ...on to SAX parser which dies because the input has been truncated to|
> |// exactly what was written out to the output stream|
>
>
> Output of above:
>
> This is the input stream first time around (before copying to output
> stream)...
> <cxml>
>    <document>
>      This is the original document.
>    </document>
>    <metadata>
>      <date_of_service>2016-06-28 13:23</date_of_service>
>    </metadata>
>    <demographics>
>      <date_of_birth>1980-07-01</date_of_birth>
>      <age>36</age>
>    </demographics>
> </cxml>
>
> And now, let's copy...
> This is the input stream second time around (after copying)...
> <cxml>
>    <document>
>      This is the original document.
>    </document>
> And now, we'll go on to the SAX parser...
> <cxml> <document> This is the original document. </document>
> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
> document structures must start and end within the same entity.
>
>
> I left off the code that prints, "And now, we'll go on to the SAX
> parser..." It's in the next flowfile = session.write( ... ). I have unit
> tests that verify the good functioning of
> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
> "file" is truncated; SAX finds the first "half" just fine, but there is
> no second "half". If I comment out copying from input stream to output
> stream, the error doesn't occur--the whole document is there.
>
> Thanks for looking at this again if you can,
> Russ
>
> On 3/27/20 3:08 PM, Joe Witt wrote:
> > you should be able to call write as many times as you need.  just keep
> > using the resulting flowfile reference into the next call.
> >
> > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com>
> > wrote:
> >
> >> Mike,
> >>
> >> Many thanks for responding. Do you mean to say that all I have to do is
> >> something like this?
> >>
> >>      public void onTrigger( final ProcessContext context, final
> >>      ProcessSession session ) throws ProcessException
> >>      {
> >>         FlowFile flowfile = session.get();
> >>         ...
> >>
> >>         // this is will be our resulting flowfile...
> >>         AtomicReference< OutputStream > savedOutputStream = new
> >>      AtomicReference<>();
> >>
> >>         /* Do some processing on the in-coming flowfile then close its
> >>      input stream, but
> >>          * save the output stream for continued use.
> >>          */
> >>      *  session.write( flowfile, new InputStreamCallback()*
> >>         {
> >>           @Override
> >>      *    public void process( InputStream inputStream, OutputStream
> >>      outputStream ) throws IOException*
> >>           {
> >>             savedOutputStream.set( outputStream );
> >>             ...
> >>
> >>             // processing puts some output on the output stream...
> >>             outputStream.write( etc. );
> >>
> >>             inputStream.close();
> >>           }
> >>      *  } );*
> >>
> >>         /* Start over doing different processing on the (same/reopened)
> >>      in-coming flowfile
> >>          * continuing to use the original output stream. It's our
> >>      responsibility to close
> >>          * the saved output stream, NiFi closes the unused output stream
> >>      opened, but
> >>          * ignored by us.
> >>          */
> >>      *  session.write( flowfile, new StreamCallback()*
> >>         {
> >>           @Override
> >>      *    public void process( InputStream inputStream, OutputStream
> >>      outputStream ) throws IOException*
> >>           {
> >>             outputStream = savedOutputStream.get(); // (discard the new
> >>      output stream)
> >>             ...
> >>
> >>             // processing puts (some more) output on the original output
> >>      stream...
> >>             outputStream.write( etc. );
> >>
> >>             outputStream.close();
> >>           }
> >>      *  } );*
> >>
> >>         session.transfer( flowfile, etc. );
> >>      }
> >>
> >> I'm wondering if this will work to "discard" the new output stream
> >> opened for me (the second time) and replace it with the original one
> >> which was probably closed when the first call to
> >> session.write()finished. What's on these streams is way too big for me
> >> to put them into temporary memory, say, a ByteArrayOutputStream.
> >>
> >> Russ
> >>
> >> On 3/27/20 10:03 AM, Mike Thomsen wrote:
> >>> session.read(FlowFile) just gives you an InputStream. You should be
> able
> >> to
> >>> rerun that as many times as you want provided you properly close it.
> >>>
> >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
> russ@windofkeltia.com>
> >>> wrote:
> >>>
> >>>> In my custom processor, I'm using a SAX parser to process an incoming
> >>>> flowfile that's in XML. Except that, this particular XML is in essence
> >>>> two different files and I would like to split, read and process the
> >>>> first "half", which starts a couple of lines (XML elements) into the
> >>>> file) not using the SAX parser. At the end, I would stream the output
> of
> >>>> the first half, then the SAX-processed second half.
> >>>>
> >>>> So, in short:
> >>>>
> >>>>    1. process the incoming flowfile for the early content not using
> SAX,
> >>>>       but merely copying as-is; at all cost I must avoid
> "reassembling"
> >>>>       the first half using my SAX handler (what I'm doing now),
> >>>>    2. output the first part down the output stream to the resulting
> >> flowfile,
> >>>>    3. (re)process the incoming flowfile using SAX (and I can just skip
> >>>>       over the first bit) and spitting the result of this second part
> out
> >>>>       down the output stream of the resulting flowfile.
> >>>>
> >>>> I guess this is tantamount to asking how, in Java, I can read an input
> >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
> >>>> developer question and more a Java question. I have looked at it that
> >>>> way too, but, if one of you knows (particularly NiFi) best practice,
I
> >>>> would very much like to hear about it.
> >>>>
> >>>> Thanks.
> >>>>
> >>>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message