flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hari Shreedharan" <hshreedha...@cloudera.com>
Subject Re: Flume 1.5.0 hdfs sink stuck with channel is full messages
Date Fri, 17 Oct 2014 15:18:19 GMT
I think we should get rid of the sfWriters lock and just used synchronized blocks everywhere.



Thanks,
Hari

On Mon, Jul 28, 2014 at 3:04 PM, Viral Bajaria <viral.bajaria@gmail.com>
wrote:

> I found the issue.
> The deadlock happens since I have 3 events that can close a file:
> 1) maxOpenFiles: set to 500
> 2) maxFileSize : set to 128MB
> 3) idleTimeout : set to 30 seconds
> The race condition occurs when the same BucketWriter is being evicted via
> the maxOpenFiles and the idleTimeout because they both try to kick things
> using separate code paths but using synchronized code + method.
> When idleTimeout thread kicks in and calls close(true) on the BucketWriter
> which is a synchronized method. This is in turn calls the closeCallBack
> which has a synchronized block to remove an entry from the sfWriters
> LinkedHashMap. Now it waits on sfWritersLock here.
> In the meantime, we tried to add an entry to the LinkedHashMap, obviously
> after we acquired the sfWritersLock. The LinkedHashMap reached it's limit
> of 500 and so the removeEldestEntry got called which in turn called the
> close() method on BucketWriter which in turn calls the close(false). Now
> because the previous thread was already in this synchronized block, we
> don't get to enter it and so we block. But since we already have the
> sfWritersLock, the previous thread is blocked on us now.
> This results in a deadlock.
> For now I have removed the idleTimeout since a file will eventually get
> closed due to the LRU but this is a known issue.
> Should I open a JIRA for this ?
> Thanks,
> Viral
> On Sun, Jul 27, 2014 at 7:25 PM, Viral Bajaria <viral.bajaria@gmail.com>
> wrote:
>> Hi,
>>
>> I am using 1.5.0 with the hdfs sink to write files to S3. The process
>> writes fine for a while but eventually I started getting the following
>> message:
>>
>> INFO  [pool-5-thread-1]
>> (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents:224)
>>  - Last read was never committed - resetting mark position.
>> WARN  [pool-5-thread-1]
>> (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:238)
>>  - The channel is full, and cannot write data now. The source will try
>> again after 4000 milliseconds
>>
>> I looked into the logs and I don't see any errors besides the channel is
>> full.
>>
>> I saw that the number of open files for writing to HDFS is 1 shy of the
>> max count (500) and that's when this issue starts every single time.
>>
>> Following are my configuration settings:
>>
>> *BEGIN CONFIG*
>> agent.sources = spooldir
>> agent.channels = memoryChannel
>> agent.sinks = s3Sink
>>
>> agent.sources.spooldir.channels = memoryChannel
>> agent.sources.spooldir.type = spooldir
>> agent.sources.spooldir.spoolDir = <spool_directory>
>> agent.sources.spooldir.deserializer.maxLineLength = 4096
>> agent.sources.spooldir.decodeErrorPolicy = IGNORE
>>
>> #s3 sink
>> agent.sinks.s3Sink.channel = memoryChannel
>> agent.sinks.s3Sink.type = hdfs
>> agent.sinks.s3Sink.hdfs.rollInterval = 0
>> agent.sinks.s3Sink.hdfs.rollSize = 134217728
>> agent.sinks.s3Sink.hdfs.rollCount = 0
>> agent.sinks.s3Sink.hdfs.batchSize = 1000
>> agent.sinks.s3Sink.hdfs.maxOpenFiles = 500
>> agent.sinks.s3Sink.hdfs.idleTimeout = 30
>> agent.sinks.s3Sink.hdfs.codeC = gzip
>> agent.sinks.s3Sink.hdfs.writeFormat = Text
>> agent.sinks.s3Sink.hdfs.path = <s3_path>
>> #hdfs sink uses java.util.Time and needs the long timezone format
>> agent.sinks.s3Sink.hdfs.timeZone = America/New_York
>>
>>
>> agent.channels.memoryChannel.type = memory
>> agent.channels.memoryChannel.capacity = 100000
>> agent.channels.memoryChannel.transactionCapacity = 1000
>>
>> *END CONFIG*
>>
>> Out of curiosity, I did a jstack on the process and it says there is a
>> deadlock:
>>
>> Java stack information for the threads listed above:
>> ===================================================
>> "hdfs-s3Sink-roll-timer-0":
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:404)
>>         - waiting to lock <0x000000077ae51798> (a java.lang.Object)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.runCloseAction(BucketWriter.java:487)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:448)
>>         - locked <0x0000000781f08ec0> (a
>> org.apache.flume.sink.hdfs.BucketWriter)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:472)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:467)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> "SinkRunner-PollingRunner-DefaultSinkProcessor":
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:331)
>>         - waiting to lock <0x0000000781f08ec0> (a
>> org.apache.flume.sink.hdfs.BucketWriter)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$WriterLinkedHashMap.removeEldestEntry(HDFSEventSink.java:175)
>>         at java.util.LinkedHashMap.addEntry(LinkedHashMap.java:431)
>>         at java.util.HashMap.put(HashMap.java:509)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:415)
>>         - locked <0x000000077ae51798> (a java.lang.Object)
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>         at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> Found 1 deadlock.
>>
>> Is the deadlock the issue for the channel getting choked and not moving
>> forward ?
>>
>> Thanks,
>> Viral
>>
>>
>>
Mime
View raw message