nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Otto Fowler <ottobackwa...@gmail.com>
Subject Re: Reading the incoming flowfile "twice"
Date Tue, 31 Mar 2020 16:36:50 GMT
Oh, sorry I did not understand that you needed to do that




On March 31, 2020 at 11:22:20, Russell Bateman (russ@windofkeltia.com)
wrote:

Yes, I though of that, but there's no way to insert completing XML
structure into the input stream ahead of (<metadata>). SAX will choke if
I just start feeding it the flowfile where I left off from copying up to
</document>.

On 3/30/20 8:25 PM, Otto Fowler wrote:
> Can I ask why you would consume the whole stream when doing the non-sax
> part? If you consume the stream right up to the sax part ( the stream POS
> is at the start of the xml ) then you can just pass the stream to sax as
is
> can’t you?
>
>
>
>
> On March 30, 2020 at 16:23:27, Russell Bateman (russ@windofkeltia.com)
> wrote:
>
> If I haven't worn out my welcome, here is the simplified code that should
> demonstrate either that I have miscoded your suggestions or that the API
> doesn't in fact work as advertised. First, the output. The code, both
JUnit
> test and processor are attached and the files are pretty small.
>
> Much thanks,
> Russ
>
> This is the input stream first time around (before copying)
> ===================================
> * * * session.read( flowfile );
> Here's what's in input stream:
> *<cxml>*
> * <document>*
> * This is the original document.*
> * </document>*
> * <metadata>*
> * <date_of_service>2016-06-28 13:23</date_of_service>*
> * </metadata>*
> * <demographics>*
> * <date_of_birth>1980-07-01</date_of_birth>*
> * <age>36</age>*
> * </demographics>*
> *</cxml>*
>
> And now, let's copy some of the input stream to the output stream
> =============================
> * * * flowfile = session.write( flowfile, new StreamCallback() ...
> Copying input stream to output stream up to </document>...
> The output stream has in it at this point:
> *<cxml>*
> * <document>*
> * This is the original document.*
> * </document>*
>
> [1. When we examine the output stream, it has what we expect.]
>
> After copying, can we reopen input stream intact and does outputstream
have
> what we think? ====
> * * * flowfile = session.write( flowfile, new StreamCallback() ...
> Here's what's in input stream:
> *<cxml>*
> * <document>*
> * This is the original document.*
> * </document>*
>
> [2. The input stream as reported just above is truncated by exactly the
> content we did
> not copy to the output stream. We expected to see the entire,
> original file, but the
> second half is gone.]
>
> Here's what's in the output stream at this point:
> * (nothing)*
>
> [3. The content we copied to the output stream has disappeared. Does it
> disappear simply
> because we looked at it (printed it out here)?]
>
>
> On 3/29/20 5:05 AM, Joe Witt wrote:
>
> Russell
>
> I recommend writing very simple code that does two successive read/write
> operations on basic data so you can make sure the api work/as expected.
> Then add the xml bits.
>
> Thanks
>
> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen <mikerthomsen@gmail.com>
> <mikerthomsen@gmail.com> wrote:
>
>
> If these files are only a few MB at the most, you can also just export
them
> to a ByteArrayOutputStream. Just a thought.
>
> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman
> <russ@windofkeltia.com> <russ@windofkeltia.com>
> wrote:
>
>
> Joe and Mike,
>
> Sadly, I was not able to get very far on this. It seems that the extend
> to which I copy the first half of the contents of the input stream, I
> lose what comes after when I try to read again, basically, the second
> half comprising the <metadata>and <demographics>elements which I was
> hoping to SAX-parse. Here's code and output. I have highlighted the
> output to make it easier to read.
>
> ? <#>
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream first time around
> (before copying to output stream)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
> |{|
> |||@Override|
> |||public| |void| |process( InputStream inputStream, OutputStream
> outputStream ) ||throws| |IOException|
> |||{|
> |||System.out.println( ||"And now, let's copy..."| |);|
> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
> outputStream );|
> |||}|
> |} );|
> |try|
> |{|
> |||InputStream inputStream = session.read( flowfile );|
> |||System.out.println( ||"This is the input stream second time around
> (after copying)..."| |);|
> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> |||inputStream.close();|
> |}|
> |catch||( IOException e )|
> |{|
> |||e.printStackTrace();|
> |}|
> |// ...on to SAX parser which dies because the input has been truncated
>
> to|
>
> |// exactly what was written out to the output stream|
>
>
> Output of above:
>
> This is the input stream first time around (before copying to output
> stream)...
> <cxml>
> <document>
> This is the original document.
> </document>
> <metadata>
> <date_of_service>2016-06-28 13:23</date_of_service>
> </metadata>
> <demographics>
> <date_of_birth>1980-07-01</date_of_birth>
> <age>36</age>
> </demographics>
> </cxml>
>
> And now, let's copy...
> This is the input stream second time around (after copying)...
> <cxml>
> <document>
> This is the original document.
> </document>
> And now, we'll go on to the SAX parser...
> <cxml> <document> This is the original document. </document>
> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
> document structures must start and end within the same entity.
>
>
> I left off the code that prints, "And now, we'll go on to the SAX
> parser..." It's in the next flowfile = session.write( ... ). I have unit
> tests that verify the good functioning of
> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
> "file" is truncated; SAX finds the first "half" just fine, but there is
> no second "half". If I comment out copying from input stream to output
> stream, the error doesn't occur--the whole document is there.
>
> Thanks for looking at this again if you can,
> Russ
>
> On 3/27/20 3:08 PM, Joe Witt wrote:
>
> you should be able to call write as many times as you need. just keep
> using the resulting flowfile reference into the next call.
>
> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <russ@windofkeltia.com
>
> wrote:
>
>
> Mike,
>
> Many thanks for responding. Do you mean to say that all I have to do
>
> is
>
> something like this?
>
> public void onTrigger( final ProcessContext context, final
> ProcessSession session ) throws ProcessException
> {
> FlowFile flowfile = session.get();
> ...
>
> // this is will be our resulting flowfile...
> AtomicReference< OutputStream > savedOutputStream = new
> AtomicReference<>();
>
> /* Do some processing on the in-coming flowfile then close its
> input stream, but
> * save the output stream for continued use.
> */
> * session.write( flowfile, new InputStreamCallback()*
> {
> @Override
> * public void process( InputStream inputStream, OutputStream
> outputStream ) throws IOException*
> {
> savedOutputStream.set( outputStream );
> ...
>
> // processing puts some output on the output stream...
> outputStream.write( etc. );
>
> inputStream.close();
> }
> * } );*
>
> /* Start over doing different processing on the
>
> (same/reopened)
>
> in-coming flowfile
> * continuing to use the original output stream. It's our
> responsibility to close
> * the saved output stream, NiFi closes the unused output
>
> stream
>
> opened, but
> * ignored by us.
> */
> * session.write( flowfile, new StreamCallback()*
> {
> @Override
> * public void process( InputStream inputStream, OutputStream
> outputStream ) throws IOException*
> {
> outputStream = savedOutputStream.get(); // (discard the
>
> new
>
> output stream)
> ...
>
> // processing puts (some more) output on the original
>
> output
>
> stream...
> outputStream.write( etc. );
>
> outputStream.close();
> }
> * } );*
>
> session.transfer( flowfile, etc. );
> }
>
> I'm wondering if this will work to "discard" the new output stream
> opened for me (the second time) and replace it with the original one
> which was probably closed when the first call to
> session.write()finished. What's on these streams is way too big for me
> to put them into temporary memory, say, a ByteArrayOutputStream.
>
> Russ
>
> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>
> session.read(FlowFile) just gives you an InputStream. You should be
>
> able
>
> to
>
> rerun that as many times as you want provided you properly close it.
>
> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>
> russ@windofkeltia.com>
>
> wrote:
>
>
> In my custom processor, I'm using a SAX parser to process an
>
> incoming
>
> flowfile that's in XML. Except that, this particular XML is in
>
> essence
>
> two different files and I would like to split, read and process the
> first "half", which starts a couple of lines (XML elements) into the
> file) not using the SAX parser. At the end, I would stream the
>
> output
>
> of
>
> the first half, then the SAX-processed second half.
>
> So, in short:
>
> 1. process the incoming flowfile for the early content not using
>
> SAX,
>
> but merely copying as-is; at all cost I must avoid
>
> "reassembling"
>
> the first half using my SAX handler (what I'm doing now),
> 2. output the first part down the output stream to the resulting
>
> flowfile,
>
> 3. (re)process the incoming flowfile using SAX (and I can just
>
> skip
>
> over the first bit) and spitting the result of this second
>
> part
>
> out
>
> down the output stream of the resulting flowfile.
>
> I guess this is tantamount to asking how, in Java, I can read an
>
> input
>
> stream twice (or one-half plus one times). Maybe it's less a NiFi
> developer question and more a Java question. I have looked at it
>
> that
>
> way too, but, if one of you knows (particularly NiFi) best
>
> practice, I
>
> would very much like to hear about it.
>
> Thanks.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message