nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Grande <apere...@gmail.com>
Subject Re: SplitText processor OOM larger input files
Date Fri, 02 Jun 2017 12:22:39 GMT
1 vcore, which is not even a full core (a shared and oversubscribed cpu
core). I'm not sure what you expected to see when you raised concurrency to
10 :)

There's a lot of things NiFi is doing behind the scenes, especially around
provenance recording. I don't recommend anything below 4 cores to have
meaningful experience‚Äč. If in a cloud, go to 8 cores per VM, unless you are
designing for a low footprint with MiNiFi.

Andrew

On Fri, Jun 2, 2017, 6:30 AM Martin Eden <martineden131@gmail.com> wrote:

> Thanks Andrew,
>
> I have added UpdateAttribute processors to update the file names like you
> said. Now it works, writing out 1MB files at a time (updated the
> MergeContent MaxNumberOfEntries to 10000 to achieve that since each line in
> my csv is 100 bytes).
>
> The current flow is:
> ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
> -> RouteOnContent -> MergeContent -> UpdateAttribute -> PutHDFS
>
>
>     -> MergeContent -> UpdateAttribute -> PutHDFS
>
>
>     -> MergeContent -> UpdateAttribute -> PutHDFS
>
> So now let's talk performance.
>
> With a 1 node NiFi running on a Google Compute Engine instance with 1 core
> and 3.7 GB RAM and a 20GB disk, when I feed one 300MB zip file
> (uncompressed 2.5GB csv text) to this flow it is basically never finishing
> the job of transferring all the data.
>
> The inbound queue of RouteOnContent is always red and outbound queues are
> mostly green so that indicates that this processor is the bottleneck. To
> mitigate this I increased its number of concurrent tasks to 10 and then
> observed tasks in progress 10, outbound queues temporarily red, avg task
> latency increased from 2ms to 20ms, cpu on box maxed out to 100% by the
> NiFi java process, load avg 5.
>
> I then decreased the number of concurrent tasks of RouteOnContent to 5 and
> the task average time dropped to about half as expected, with cpu still
> 100% taken by the NiFi java process.
>
> The RouteOnContent has 3 simple regexes that it applies.
>
> Questions:
>
> 1. Is it safe to say that I maxed out the performance of this flow on one
> box with 1 core and 3.8 GB ram?
>
> 2. The performance seems a lot lower than expected though which is
> worrying. Is this expected? I am planning to do this at much larger scale,
> hundreds of GBs.
>
> 3. Is the RouteOnContent that I am using hitting NiFi hard? Is this not a
> recommended use case? Is there anything obviously wrong in my flow?
> Doing a bit of digging around in docs, presentations and other people's
> experience it seems that NiFi's sweet spot is routing files based on
> metadata (properties) and not really based on the actual contents of the
> files.
>
> 4. Is Nifi suitable for large scale ETL. Copying and doing simple massaging
> of data from File System A to File System B? From Database A to Database B?
> This is what I am evaluating it for.
>
> I do see how running this on a box with more CPU and RAM, faster disks
> (vertical scaling) would improve the performance and then adding another
> node to the cluster. But I want to first validate the choice of
> benchmarking flow and understand the performance on one box.
>
> Thanks a lot for all the people for helping me on this thread on my NiFi
> evaluation journey. This is a really big plus for community support of
> NiFi.
>
> M
>
>
>
>
>
>
> On Thu, Jun 1, 2017 at 1:30 PM, Andrew Grande <aperepel@gmail.com> wrote:
>
> > It looks like your max bin size is 1000 and 10MB. Every time you hit
> those,
> > it will write out a merged file. Update tge filename attribute to be
> unique
> > before writing via PutHDFS.
> >
> > Andrew
> >
> > On Thu, Jun 1, 2017, 2:24 AM Martin Eden <martineden131@gmail.com>
> wrote:
> >
> > > Hi Joe,
> > >
> > > Thanks for the explanations. Really useful in understanding how it
> works.
> > > Good to know that in the future this will be improved.
> > >
> > > About the appending to HDFS issue let me recap. My flow is:
> > > ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) ->
> SplitText(1)
> > > -> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv
> > >
> > >
> > >     -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv
> > >
> > >
> > >     -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv
> > >
> > > ListHDFS is monitoring an input folder where 300MB zip files are added
> > > periodically. Each file uncompressed is 2.5 GB csv.
> > >
> > > So I am writing out to hdfs from multiple PutHDFS processors all of
> them
> > > having conflict resolution set to *APPEND* and different output
> folders.
> > >
> > > The name of the file will be however the same *f.csv*. It gets picked
> up
> > > from the name of the flow files which bear the name of the original
> > > uncompressed file. This happens I think in the MergeContent processor.
> > >
> > > Since all of these processors are running with 1 concurrent task, it
> > seems
> > > that we cannot append concurrently to hdfs even if we are appending to
> > > different files in different folders for some reason. Any ideas how to
> > > mitigate this?
> > >
> > > It seems other people have encountered this
> > > <
> > > https://community.hortonworks.com/questions/61096/puthdfs-
> > leaseexpiredexception-error-when-running-m.html
> > > >
> > > with NiFi but there is no conclusive solution. It does seem also that
> > > appending to hdfs is somewhat problematic
> > > <
> > > http://community.cloudera.com/t5/Storage-Random-Access-HDFS/
> > How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369
> > > >
> > > .
> > >
> > > So stepping back, the reason I am doing append in the PutHDFS is
> because
> > I
> > > did not manage to find a setting in the MergeContent processors that
> > > basically allows creation of multiple bundled flow files with the same
> > root
> > > name but different sequence numbers or timestamps (like f.csv.1,
> f.csv.2
> > > ....). They all get the same name which is f.csv. Is that possible
> > somehow?
> > > See my detailed MergeContent processor config below.
> > >
> > > So basically I have a 2.5GB csv file that eventually gets broken up in
> > > lines and the lines gets merged together in bundles of 10 MB but when
> > those
> > > bundles are emitted to the PutHDFS they have the same name as the
> > original
> > > file over and over again. I would like them to have a different name
> > based
> > > on a timestamp or sequence number let's say so that I can avoid the
> > append
> > > conflict resolution in PutHDFS which is causing me grief right now. Is
> > that
> > > possible?
> > >
> > > Thanks,
> > > M
> > >
> > >
> > > Currently my MergeContent processor config is:
> > >   <properties>
> > > *   <entry> <key>Merge Strategy</key> <value>Bin-Packing
> > Algorithm</value>
> > > </entry>*
> > > *   <entry> <key>Merge Format</key> <value>Binary Concatenation</value>
> > > </entry>*
> > >    <entry> <key>Attribute Strategy</key><value>Keep
Only Common
> > > Attributes</value> </entry>
> > >    <entry> <key>Correlation Attribute Name</key> </entry>
> > >    <entry> <key>Minimum Number of Entries</key><value>1</value>
> </entry>
> > >    <entry> <key>Maximum Number of Entries</key> <value>1000</value>
> > > </entry>
> > >    <entry> <key>Minimum Group Size</key> <value>0 B</value>
</entry>
> > > *   <entry> <key>Maximum Group Size</key> <value>10
MB</value>
> </entry>*
> > >    <entry> <key>Max Bin Age</key> </entry>
> > >    <entry> <key>Maximum number of Bins</key> <value>5</value>
</entry>
> > >    <entry> <key>Delimiter Strategy</key><value>Text</value>
</entry>
> > >    <entry> <key>Header File</key> </entry>
> > >    <entry> <key>Footer File</key> </entry>
> > >    <entry> <key>Demarcator File</key> <value></value>
</entry>
> > >    <entry> <key>Compression Level</key> <value>1</value></entry>
> > >    <entry> <key>Keep Path</key> <value>false</value>
</entry>
> > >   </properties>
> > >
> > >
> > > On Wed, May 31, 2017 at 3:52 PM, Joe Witt <joe.witt@gmail.com> wrote:
> > >
> > > > Split failed before even with backpressure:
> > > > - yes that backpressure kicks in when destination queues for a given
> > > > processor have reached their target size (in count of flowfiles or
> > > > total size represented).  However, to clarify why the OOM happened it
> > > > is important to realize that it is not about 'flow files over a quick
> > > > period of time' but rather 'flow files held within a single process
> > > > session.  Your SplitText was pulling a single flowfile but then
> > > > creating lets say 1,000,000 resulting flow files and then committing
> > > > that change.  That happens within a session.  But all those flow file
> > > > objects (not their content) are held in memory and at such high
> > > > numbers it creates excessive heap usage.  The two phase
> divide/conquer
> > > > approach Koji suggested solves that and eventually we need to solve
> > > > that by swapping out the flowfiles to disk within a session.  We
> > > > actually do swap out flowfiles sitting on queues after a certain
> > > > threshold is reached for this very reason.  This means you should be
> > > > able to have many millions of flowfiles sitting around in the flow
> for
> > > > whatever reason and not hit memory problems.
> > > >
> > > > Hope that helps there.
> > > >
> > > > On PutHDFS it looks like possibly two things are trying to append to
> > > > the same file?  If yes I'd really recommend not appending but rather
> > > > use MergeContent to create data bundles of a given size then write
> > > > those to HDFS.
> > > >
> > > > Thanks
> > > > Joe
> > > >
> > > > On Wed, May 31, 2017 at 10:33 AM, Martin Eden <
> martineden131@gmail.com
> > >
> > > > wrote:
> > > > > Hi Koji,
> > > > >
> > > > > Good to know that it can handle large files. I thought it was the
> > case
> > > > but
> > > > > I was just not seeing in practice.
> > > > >
> > > > > Yes I am using 'Line Split Count' as 1 at SplitText.
> > > > >
> > > > > I added the extra SplitText processor exactly as you suggested and
> > the
> > > > OOM
> > > > > went away. So, big thanks!!!
> > > > >
> > > > > However I have 2 follow-up questions:
> > > > >
> > > > > 1. Before adding the extra SplitText processor I also played with
> the
> > > > > back-pressure settings on the outbound queue of the original
> > SplitText
> > > > > processor, since you mentioned that it is generating files at a
> rate
> > > that
> > > > > is too high, I figure the queue should slow it down. I tried a
> limit
> > of
> > > > > 100MB or 1000 files and I still got the OOMs in the SplitText
> > > processor.
> > > > > Why isn't the queue back-pressure helping me in this case? Where
> > would
> > > > that
> > > > > come in handy then? Why id the extra SplitText processor needed to
> > fix
> > > > > things and not just the queue back-pressure?
> > > > >
> > > > > 2. I am now close to completing my flow but I am hitting another
> > error.
> > > > > This time it's the last stage, the PutHDFS throws
> > > > > o.apache.nifi.processors.hadoop.PutHDFS
> > > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write
to
> > > HDFS
> > > > > due to org.apache.nifi.processor.exception.ProcessException:
> > > IOException
> > > > > thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > > > > See the full stacktrace below.
> > > > > I have a parallelism of 1 for my PutHDFS processors. Any ideas why
> > this
> > > > is
> > > > > happening?
> > > > >
> > > > > Thanks,
> > > > > Martin
> > > > >
> > > > > 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
> > > > > o.apache.nifi.processors.hadoop.PutHDFS
> > > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
> > > > >
> > > > > ailed to write to HDFS due to
> > > > > org.apache.nifi.processor.exception.ProcessException: IOException
> > > thrown
> > > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > > > >
> > > > > 5aa]:
> > > > > org.apache.hadoop.ipc.RemoteException(org.apache.
> > hadoop.hdfs.protocol.
> > > > AlreadyBeingCreatedException):
> > > > > Failed to APPEND_FILE /nifi_out/unmatched/log
> > > > >
> > > > > 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on
> 10.128.0.7
> > > > > because DFSClient_NONMAPREDUCE_-1411681085_97 is already the
> current
> > > > lease
> > > > > holder.
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFileInternal(
> > > > FSNamesystem.java:2683)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFileInt(FSNamesystem.java:2982)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFile(FSNamesystem.java:2950)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > > append(NameNodeRpcServer.java:655)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.java:421)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > > >
> > > ClientNamenodeProtocol$2.callBlockingMethod(
> > ClientNamenodeProtocolProtos.j
> > > > >
> > > > > ava)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> > ProtoBufRpcInvoker.call(
> > > > ProtobufRpcEngine.java:616)
> > > > >
> > > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2049)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2045)
> > > > >
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1698)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$
> > Handler.run(Server.java:2043)
> > > > >
> > > > > : {}
> > > > >
> > > > > org.apache.nifi.processor.exception.ProcessException: IOException
> > > thrown
> > > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
> > > > > org.apache.hadoop.ipc.Re
> > > > >
> > > > > moteException(org.apache.hadoop.hdfs.protocol.
> > > > AlreadyBeingCreatedException):
> > > > > Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for
> > > > DFSClient_NON
> > > > >
> > > > > MAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > > > holder.
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFileInternal(
> > > > FSNamesystem.java:2683)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFileInt(FSNamesystem.java:2982)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFile(FSNamesystem.java:2950)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > > append(NameNodeRpcServer.java:655)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.java:421)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > > >
> > > ClientNamenodeProtocol$2.callBlockingMethod(
> > ClientNamenodeProtocolProtos.j
> > > > >
> > > > > ava)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> > ProtoBufRpcInvoker.call(
> > > > ProtobufRpcEngine.java:616)
> > > > >
> > > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2049)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2045)
> > > > >
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1698)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$
> > Handler.run(Server.java:2043)
> > > > >
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > > StandardProcessSession.java:2148)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > > StandardProcessSession.java:2095)
> > > > >
> > > > >         at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.
> > > > java:293)
> > > > >
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:360)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1678)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(
> > PutHDFS.java:223)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > > > AbstractProcessor.java:27)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > > > StandardProcessorNode.java:1118)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > > > ContinuallyRunProcessorTask.java:144)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > > > ContinuallyRunProcessorTask.java:47)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
> > run(
> > > > TimerDrivenSchedulingAgent.java:132)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.Executors$RunnableAdapter.
> > call(Executors.java:511)
> > > > >
> > > > >         at java.util.concurrent.FutureTask.runAndReset(
> > > > FutureTask.java:308)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > > ThreadPoolExecutor.java:1142)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > > ThreadPoolExecutor.java:617)
> > > > >
> > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > >
> > > > > Caused by: org.apache.hadoop.ipc.RemoteException: Failed to
> > APPEND_FILE
> > > > > /nifi_out/unmatched/log20160930.csv for
> > > > > DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > > > holder.
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFileInternal(
> > > > FSNamesystem.java:2683)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFileInt(FSNamesystem.java:2982)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFile(FSNamesystem.java:2950)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > > append(NameNodeRpcServer.java:655)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.java:421)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > > > ClientNamenodeProtocol$2.callBlockingMethod(
> > ClientNamenodeProtocolProtos.
> > > > java)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> > ProtoBufRpcInvoker.call(
> > > > ProtobufRpcEngine.java:616)
> > > > >
> > > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2049)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2045)
> > > > >
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1698)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$
> > Handler.run(Server.java:2043)
> > > > >
> > > > >
> > > > >         at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> > > > invoke(ProtobufRpcEngine.java:229)
> > > > >
> > > > >         at com.sun.proxy.$Proxy188.append(Unknown Source)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> > > > orPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
> > > > >
> > > > >         at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown
> > > Source)
> > > > >
> > > > >         at
> > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > > DelegatingMethodAccessorImpl.java:43)
> > > > >
> > > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> > > > RetryInvocationHandler.java:191)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> > > > RetryInvocationHandler.java:102)
> > > > >
> > > > >         at com.sun.proxy.$Proxy194.append(Unknown Source)
> > > > >
> > > > >         at org.apache.hadoop.hdfs.DFSClient.callAppend(
> > > > DFSClient.java:1808)
> > > > >
> > > > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
> > java:1877)
> > > > >
> > > > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
> > java:1847)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > > > doCall(DistributedFileSystem.java:340)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > > > doCall(DistributedFileSystem.java:336)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> > > > FileSystemLinkResolver.java:81)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > > > DistributedFileSystem.java:348)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > > > DistributedFileSystem.java:318)
> > > > >
> > > > >         at org.apache.hadoop.fs.FileSystem.append(FileSystem.
> > java:1176)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(
> > PutHDFS.java:301)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > > StandardProcessSession.java:2125)
> > > > >
> > > > >         ... 18 common frames omitted
> > > > >
> > > > > On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <
> > > ijokarumawak@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Martin,
> > > > >>
> > > > >> Generally, NiFi processor doesn't load entire content of file
and
> is
> > > > >> capable of handling huge files.
> > > > >> However, having massive amount of FlowFiles can cause OOM issue
as
> > > > >> FlowFiles and its Attributes resides on heap.
> > > > >>
> > > > >> I assume you are using 'Line Split Count' as 1 at SplitText.
> > > > >> We recommend to use multiple SplitText processors to not generate
> > many
> > > > >> FlowFiles in a short period of time.
> > > > >> For example, 1st SplitText splits files per 5,000 lines, then
the
> > 2nd
> > > > >> SplitText splits into each line.
> > > > >> This way, we can decrease number of FlowFiles at a given time
> > > > >> requiring less heap.
> > > > >>
> > > > >> I hope this helps.
> > > > >>
> > > > >> Thanks,
> > > > >> Koji
> > > > >>
> > > > >> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <
> > martineden131@gmail.com
> > > >
> > > > >> wrote:
> > > > >> > Hi all,
> > > > >> >
> > > > >> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
> > > > >> >
> > > > >> > The flow I am trying to run is:
> > > > >> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent
->
> > MergeContent
> > > > ->
> > > > >> > PutHDFS
> > > > >> >
> > > > >> > When I give it a 300MB input zip file (2.5GB uncompressed)
I am
> > > > getting
> > > > >> > Java OutOfMemoryError as below.
> > > > >> >
> > > > >> > Does NiFi read in the entire contents of files in memory?
This
> is
> > > > >> > unexpected. I thought it is chunking through files. Giving
more
> > ram
> > > is
> > > > >> not
> > > > >> > a solution as you can always get larger input files in the
> future.
> > > > >> >
> > > > >> > Does this mean NiFi is not suitable as a scalable ETL solution?
> > > > >> >
> > > > >> > Can someone please explain what is happening and how to
mitigate
> > > large
> > > > >> > files in NiFi? Any patterns?
> > > > >> >
> > > > >> > Thanks,
> > > > >> > M
> > > > >> >
> > > > >> > ERROR [Timer-Driven Process Thread-9]
> > > > >> > o.a.nifi.processors.standard.SplitText
> > > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> > > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed
to
> > process
> > > > >> > session due to java.lang.OutOfMemoryError: Java heap space:
{}
> > > > >> >
> > > > >> > java.lang.OutOfMemoryError: Java heap space
> > > > >> >
> > > > >> >         at
> java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
> > > > >> >
> > > > >> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
> > > > >> >
> > > > >> >         at java.util.HashMap.<init>(HashMap.java:489)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > > > >> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > > > >> Builder.addAttributes(StandardFlowFileRecord.java:234)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.repository.StandardProcessSession.
> > > > >> putAllAttributes(StandardProcessSession.java:1723)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.processors.standard.SplitText.
> > > > >> updateAttributes(SplitText.java:367)
> > > > >> >
> > > > >> >         at
> > > > >> >
> > > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
> > > > >> SplitText.java:320)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.processors.standard.SplitText.onTrigger(
> > > > >> SplitText.java:258)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > > > >> AbstractProcessor.java:27)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > > > >> StandardProcessorNode.java:1118)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> > call(
> > > > >> ContinuallyRunProcessorTask.java:144)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> > call(
> > > > >> ContinuallyRunProcessorTask.java:47)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.scheduling.
> > TimerDrivenSchedulingAgent$1.
> > > > run(
> > > > >> TimerDrivenSchedulingAgent.java:132)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.Executors$RunnableAdapter.
> > > > call(Executors.java:511)
> > > > >> >
> > > > >> >         at java.util.concurrent.FutureTask.runAndReset(
> > > > >> FutureTask.java:308)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > > >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
> > java:180)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > > >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > > >> ThreadPoolExecutor.java:1142)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > > >> ThreadPoolExecutor.java:617)
> > > > >> >
> > > > >> >         at java.lang.Thread.run(Thread.java:748)
> > > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message