flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cameron Wellock <cameron.well...@nrelate.com>
Subject Unable to put batch on required channel
Date Thu, 26 Sep 2013 15:36:32 GMT
Hello world,

I've been trying to set up a test instance of flume and have been stymied
by recurring failures. I'm trying to use a single flume agent moving about
200G of data from a spooldir into a very small hadoop cluster (3 nodes). If
flume is the only thing writing to HDFS, everything works fine, but as soon
as another application starts writing data into the cluster HDFS slows down
and flume barfs with an "unable to put batch on required channel" exception.

I have tried all kinds of configuration changes, to no avail. I have tried
memory channels, file channels, small batch sizes (down to 50), large batch
sizes (up to 20000), increasing timeouts, increasing channel capacity (up
to 150 million), you name it. Sooner or later (usually 5-10 minutes after
restart) flume comes to a halt. This is especially vexing considering that
it's copying from a file to a file--there are no realtime requirements that
might reasonably lead to a full channel in other circumstances. Anybody
have any advice? Insights? Wild guesses? Outright lies?

Below are two exceptions from the log, one from a memory channel
configuration, one from a file channel configuration, and below that is the
most recent configuration file used. Absolutely any suggestions would be
appreciated.

Thanks,
Cameron


25 Sep 2013 21:05:12,262 ERROR [pool-5-thread-1]
(org.apache.flume.source.SpoolDirectorySource$Spool
DirectoryRunnable.run:195)  - FATAL: Spool Directory source r1: { spoolDir:
/var/nrelate/flume-spool
 }: Uncaught exception in SpoolDirectorySource thread. Restart or
reconfigure Flume to continue proc
essing.
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel
.MemoryChannel{name: c1}
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou
rce.java:189)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled
ThreadPoolExecutor.java:165)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP
oolExecutor.java:267)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
Caused by: org.apache.flume.ChannelException: Space for commit to queue
couldn't be acquired Sinks a
re likely not keeping up with sources, or the buffer size is too tight
at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:
151)
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 9 more


25 Sep 2013 22:18:37,672 ERROR [pool-5-thread-1]
(org.apache.flume.source.SpoolDirectorySource$Spool
DirectoryRunnable.run:195)  - FATAL: Spool Directory source r1: { spoolDir:
/var/nrelate/flume-spool
 }: Uncaught exception in SpoolDirectorySource thread. Restart or
reconfigure Flume to continue proc
essing.
org.apache.flume.ChannelException: Unable to put batch on required channel:
FileChannel c1 { dataDir
s: [/var/lib/flume-ng/.flume/file-channel/data] }
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySou
rce.java:189)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Scheduled
ThreadPoolExecutor.java:165)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadP
oolExecutor.java:267)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
Caused by: org.apache.flume.ChannelException: The channel has reached it's
capacity. This might be t
he result of a sink on the channel having too low of batch size, a
downstream system running slower
than normal, or that the channel capacity is just too low. [channel=c1]
at
org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:46
8)
at
org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at
org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
... 9 more




# define the pipeline parts
--------------------------------------------------------------

agent.sources = r1
agent.sinks = k1
agent.channels = c1

agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

# the main source, a spooldir
------------------------------------------------------------

agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /var/intheworld/flume-spool
agent.sources.r1.batchSize = 10000
agent.sources.r1.deserializer.maxLineLength = 10000
agent.sources.r1.interceptors = i1 i2

# parse out the timestamp and add to header
agent.sources.r1.interceptors.i1.type = regex_extractor
agent.sources.r1.interceptors.i1.regex = ^.*\\"ts\\":(\\d+).*$
agent.sources.r1.interceptors.i1.serializers = s1
agent.sources.r1.interceptors.i1.serializers.s1.name = timestamp

# also set host (hostname doesn't work properly, so set explicitly)
agent.sources.r1.interceptors.i2.type = static
agent.sources.r1.interceptors.i2.key = host
agent.sources.r1.interceptors.i2.value = Ess003726

# the sink, HDFS
-------------------------------------------------------------------------

agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://
a.host.in.the.world.com/events/raw/%Y-%m-%d
agent.sinks.k1.hdfs.filePrefix = %{host}
agent.sinks.k1.hdfs.rollInterval = 0
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollCount = 0
agent.sinks.k1.hdfs.batchSize = 10000
agent.sinks.k1.hdfs.txnEventMax = 10000
agent.sinks.k1.hdfs.idleTimeout = 900
agent.sinks.k1.hdfs.callTimeout = 300000
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.writeFormat = Text

# the channel
----------------------------------------------------------------------------

agent.channels.c1.type = file
agent.channels.c1.capacity = 150000000
agent.channels.c1.transactionCapacity = 10000
agent.channels.c1.write-timeout = 360

Mime
View raw message