flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 周梦想 <abloz...@gmail.com>
Subject Re: Take list for MemoryTransaction, capacity 100 full?
Date Tue, 26 Feb 2013 07:41:04 GMT
I found if agent46.sinks.myhdfssink.hdfs.batchSize >= 100, it will report
this error.
if I set this configure to 10, it's ok. but it's a bit slower.

Best Regards,
Andy

2013/2/26 周梦想 <ablozhou@gmail.com>

> more logs:
>
> 2013-02-26 14:37:00,380 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException:
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Take list for
> MemoryTransaction, capacity 100 full, consider committing more frequently,
> increasing capacity, or increasing thread count
>          at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
>         ... 3 more
> 2013-02-26 14:37:02,854 (pool-7-thread-1) [ERROR -
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
> source userlogsrc: Unable to process event batch. Exception follows.
> org.apache.flume.ChannelException: Unable to put batch on required
> channel: org.apache.flume.channel.MemoryChannel{name: memch1}
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
>         at
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
>         at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at
> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
>         at org.apache.avro.ipc.Responder.respond(Responder.java:149)
>         at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
>         at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
>         at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
>         at
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks are likely not keeping up with sources, or the
> buffer size is too tight
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
>         ... 28 more
>
>
> 2013/2/26 周梦想 <ablozhou@gmail.com>
>
>> hello,
>> I using flume-ng send data from windows to linux hdfs through avro
>> protocol, and encountered this error:
>>
>> 2013-02-26 12:21:02,908 (pool-8-thread-1) [DEBUG -
>> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro
>> source userlogsrc: Received avro event batch of 100 events.
>>
>> 2013-02-26 12:21:03,107 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>> [ERROR -
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:460)]
>> process failed
>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>> capacity 100 full, consider committing more frequently, increasing
>> capacity, or increasing thread count
>>         at
>> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
>>         at
>> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>>         at
>> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>         at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>         at java.lang.Thread.run(Thread.java:722)
>> Caused by: org.apache.flume.ChannelException: Space for commit to queue
>> couldn't be acquired Sinks are likely not keeping up with sources, or the
>> buffer size is too tight
>>         at
>> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
>>         at
>> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>>         at
>> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
>>         ... 28 more
>>
>> I have set memory channel capacity to 1000, but it still report this
>> error.
>> some one can give me any advice?
>>
>> Thanks,
>> Andy
>>
>> hdfs.conf:
>>
>> agent46.sources = userlogsrc gamelogsrc
>> agent46.channels = memch1
>> agent46.sinks = myhdfssink
>>
>> #channels:
>> agent46.channels.memch1.type = memory
>> agent46.channels.memch1.capacity = 10000
>> agent46.channels.memch1.transactionCapactiy = 100
>> #sources:
>> #userlogsrc:
>> #agent46.sources.userlogsrc.type = syslogTcp
>> agent46.sources.userlogsrc.type = avro
>> agent46.sources.userlogsrc.port = 5140
>> agent46.sources.userlogsrc.bind= 0.0.0.0
>> #agent46.sources.userlogsrc.host= hadoop48
>> agent46.sources.userlogsrc.interceptors = i1 i2 i3
>> agent46.sources.userlogsrc.interceptors.i1.type =
>> org.apache.flume.interceptor.HostInterceptor$Builder
>> agent46.sources.userlogsrc.interceptors.i1.preserveExisting = true
>> #agent46.sources.userlogsrc.interceptors.i1.hostHeader = hostname
>> agent46.sources.userlogsrc.interceptors.i1.useIP = false
>>  agent46.sources.userlogsrc.interceptors.i2.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>> agent46.sources.userlogsrc.interceptors.i3.type = static
>> agent46.sources.userlogsrc.interceptors.i3.key = datacenter
>> agent46.sources.userlogsrc.interceptors.i3.value = userdata
>> agent46.sources.userlogsrc.channels = memch1
>> #gamelogsrc:
>> #agent46.sources.gamelogsrc.type = syslogTcp
>> agent46.sources.gamelogsrc.type = avro
>> agent46.sources.gamelogsrc.port = 5150
>> agent46.sources.gamelogsrc.bind= 0.0.0.0
>> agent46.sources.gamelogsrc.channels = memch1
>> #sinks:
>> agent46.sinks.myhdfssink.channel = memch1
>>
>> agent46.sinks.myhdfssink.type = hdfs
>> agent46.sinks.myhdfssink.hdfs.rollInterval = 120
>> agent46.sinks.myhdfssink.hdfs.appendTimeout = 1000
>> agent46.sinks.myhdfssink.hdfs.rollSize = 209715200
>> agent46.sinks.myhdfssink.hdfs.rollCount = 600000
>> agent46.sinks.myhdfssink.hdfs.batchSize = 1000
>> agent46.sinks.myhdfssink.hdfs.txnEventMax = 100000
>> agent46.sinks.myhdfssink.hdfs.threadsPoolSize= 100
>> agent46.sinks.myhdfssink.hdfs.path =
>> hdfs://h46:9000/flume/%{filename}/%m%d
>> #agent46.sinks.myhdfssink.hdfs.filePrefix = userlogsrc.%{host}
>> #agent46.sinks.myhdfssink.hdfs.filePrefix =
>> %{filename}.%{hostname}.%{datacenter}.%Y%m%d
>> agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{host}.%Y%m%d
>> #agent46.sinks.myhdfssink.hdfs.rollInterval = 60
>> #agent46.sinks.myhdfssink.hdfs.fileType = SequenceFile
>> agent46.sinks.myhdfssink.hdfs.fileType = DataStream
>> #agent46.sinks.myhdfssink.hdfs.file.writeFormat= Text
>>
>>
>>
>

Mime
View raw message