flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Prasad Mujumdar <pras...@cloudera.com>
Subject Re: java.lang.InterruptedException in RollSink
Date Tue, 20 Dec 2011 18:04:22 GMT
   I guess you'll also need
https://issues.apache.org/jira/browse/FLUME-762along with FLUME-798.

thanks
Prasad

On Tue, Dec 20, 2011 at 3:10 AM, Dawid Rasinski <dawid.rasinski@gmx.net>wrote:

> Hello,
>
> I was doing some experiments with Flume lately and stumbled upon a problem
> I cannot solve. I was doing some performance tests, having three collectors
> (each having a heap size of 2500 MB) and four agents. Each agent reads a
> data file (1.1 GB, ~30kB per line) using a tail source (starting from the
> beginning), makes batches of 500 events and sends these to all three
> collectors using a fanout sink. This should lead to a lot of incoming
> traffic for the collectors. The corresponing configuration is:
>
> senderA: tail("data.log", delim="\n", delimMode="exclude") | batch(500,
> 180000) [ agentDFOSink("receiver", 35853), agentDFOSink("receiver2",
> 35853), agentDFOSink("receiver3", 35853) ];
> senderB: tail("data.log", delim="\n", delimMode="exclude") | batch(500,
> 180000) [ agentDFOSink("receiver", 35853), agentDFOSink("receiver2",
> 35853), agentDFOSink("receiver3", 35853) ];
> senderC: tail("data.log", delim="\n", delimMode="exclude") | batch(500,
> 180000) [ agentDFOSink("receiver", 35853), agentDFOSink("receiver2",
> 35853), agentDFOSink("receiver3", 35853) ];
> senderD: tail("data.log", delim="\n", delimMode="exclude") | batch(500,
> 180000) [ agentDFOSink("receiver", 35853), agentDFOSink("receiver2",
> 35853), agentDFOSink("receiver3", 35853) ];
> receiver: collectorSource(35853) | unbatch
> collectorSink("hdfs://namenode:9000/flume.log/%Y%m%d%H00",
> "receiver-",10000);
> receiver2: collectorSource(35853) | unbatch
> collectorSink("hdfs://namenode:9000/flume.log/%Y%m%d%H00",
> "receiver2-",10000);
> receiver3: collectorSource(35853) | unbatch
> collectorSink("hdfs://namenode:9000/flume.log/%Y%m%d%H00",
> "receiver3-",10000);
>
> With this setup I run into java.lang.InterruptedExceptions in the
> collector side RollSink, leading to the following stack trace:
>
> 2011-12-20 09:45:29,145 [Roll-TriggerThread-0] ERROR rolling.RollSink:
> RollSink interrupted
> java.lang.InterruptedException
>        at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireNanos(AbstractQueuedSynchronizer.java:918)
>        at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1224)
>        at
> java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.tryLock(ReentrantReadWriteLock.java:976)
>        at
> com.cloudera.flume.handlers.rolling.RollSink.rotate(RollSink.java:283)
>        at
> com.cloudera.flume.handlers.rolling.RollSink$TriggerThread.run(RollSink.java:149)
>
> This seems to coincide with this second error on the same collector node:
>
> 2011-12-20 09:45:30,193 [logicalNode receiver3-19] INFO
> debug.InsistentAppendDec
> orator: Failed due to unexpected runtime exception during append attempt
> java.lang.RuntimeException: Blocked append interrupted by rotation event
>        at
> com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:215)
>        at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
>        at
> com.cloudera.flume.core.MaskDecorator.append(MaskDecorator.java:43)
>        at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
>        at
> com.cloudera.flume.handlers.debug.InsistentOpenDecorator.append(InsistentOpenDecorator.java:169)
>        at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
>        at
> com.cloudera.flume.handlers.debug.StubbornAppendSink.append(StubbornAppendSink.java:71)
>        at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.handlers.debug.InsistentAppendDecorator.append(InsistentAppendDecorator.java:110)
>        at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
>        at
> com.cloudera.flume.handlers.endtoend.AckChecksumChecker.append(AckChecksumChecker.java:113)
>        at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
>        at
> com.cloudera.flume.handlers.batch.UnbatchingDecorator.append(UnbatchingDecorator.java:62)
>        at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
>        at
> com.cloudera.flume.handlers.batch.GunzipDecorator.append(GunzipDecorator.java:81)
>        at
> com.cloudera.flume.collector.CollectorSink.append(CollectorSink.java:222)
>        at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
>        at
> com.cloudera.flume.handlers.batch.UnbatchingDecorator.append(UnbatchingDecorator.java:74)
>        at
> com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:110)
>
> I applied the patch mentioned in
> https://issues.apache.org/jira/browse/Flume-798 and played around with
> the values for timeOut a bit, however this did not solve the problem. In
> some cases I observed that before the first error occuring, a
> java.lang.OutOfMemoryError happens:
>
> Exception in thread "pool-1-thread-3" java.lang.OutOfMemoryError: Java
> heap space
>        at
> org.apache.thrift.protocol.TBinaryProtocol.readBinary(TBinaryProtocol.java:371)
>        at
> com.cloudera.flume.handlers.thrift.ThriftFlumeEvent.read(ThriftFlumeEvent.java:738)
>        at
> com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer$append_args.read(ThriftFlumeEventServer.java:531)
>        at
> com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer$Processor$append.process(ThriftFlumeEventServer.java:249)
>        at
> com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer$Processor.process(ThriftFlumeEventServer.java:240)
>        at
> org.apache.thrift.server.TSaneThreadPoolServer$WorkerProcess.run(TSaneThreadPoolServer.java:280)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>        at java.lang.Thread.run(Thread.java:662)
>
> However, the java.lang.RuntimeException in RollSink is sometimes thrown
> without the java.lang.InterruptedException or java.lang.OutOfMemoryError
> being thrown. The whole problem happening seems to be more likely with
> increasing number of agents. Furthermore, the problem is not deterministic,
> meaning that for some runs it occurs in the very begining, for some runs
> very late and for some runs it does not happen at all. Any idea where the
> problem comes from? Is it for the JVM running low on memory or because of a
> thread timing problem? And any idea how to solve it? I'm thankful for any
> hints on that problem.
>
> Best regards,
>
> Dawid Rasinski
> --
> NEU: FreePhone - 0ct/min Handyspartarif mit Geld-zurück-Garantie!
> Jetzt informieren: http://www.gmx.net/de/go/freephone
>
>
> --
> Empfehlen Sie GMX DSL Ihren Freunden und Bekannten und wir
> belohnen Sie mit bis zu 50,- Euro! https://freundschaftswerbung.gmx.de
>

Mime
View raw message