flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brock Noland <br...@cloudera.com>
Subject Re: Flume configuration fail-over problems
Date Tue, 16 Oct 2012 18:37:05 GMT
Hi,

Can you confirm that ElasticSearchSink is consuming events?

> Once this happens flume never resumes.

It doesn't bring anything after that? Even minutes later it's still
doing nothing?

brock

On Tue, Oct 16, 2012 at 1:23 PM, Cameron Gandevia <cgandevia@gmail.com> wrote:
> Hi
>
> I am trying to setup a configuration where a single source agent load
> balances between two aggregate agents which multiplex the flow to two end
> points. I don't think I have the channel capacity properly configured but
> flume always seems to end up hanging for me. If all channels are at capacity
> should the source try and send again once the they have emptied?
>
> Here is my configuration.
>
> #
> # Properties of memoryChannel
> #
> local_agent.channels.memoryChannel-1.type = memory
> local_agent.channels.memoryChannel-1.capacity = 100000
> local_agent.channels.memoryChannel-1.transactionCapacity = 1000
>
> collector_agent_1.channels.memoryChannel-1.type = memory
> collector_agent_1.channels.memoryChannel-1.capacity = 100000
> collector_agent_1.channels.memoryChannel-1.transactionCapacity = 1000
>
> collector_agent_1.channels.memoryChannel-2.type = memory
> collector_agent_1.channels.memoryChannel-2.capacity = 100000
> collector_agent_1.channels.memoryChannel-2.transactionCapacity = 1000
>
> collector_agent_2.channels.memoryChannel-1.type = memory
> collector_agent_2.channels.memoryChannel-1.capacity = 100000
> collector_agent_2.channels.memoryChannel-1.transactionCapacity = 1000
>
> collector_agent_2.channels.memoryChannel-2.type = memory
> collector_agent_2.channels.memoryChannel-2.capacity = 100000
> collector_agent_2.channels.memoryChannel-2.transactionCapacity = 1000
>
> #
> # Properties for spooling directory source
> #
> local_agent.sources.spooldir-1.type = spooldir
> local_agent.sources.spooldir-1.spoolDir = ~/flume_test/ready
> local_agent.sources.spooldir-1.fileHeader = true
> local_agent.sources.spooldir-1.channels = memoryChannel-1
>
> #
> # Properties for the avro sink 1 agent to collector 1
> #
> local_agent.sinks.avroSink-1.type = avro
> local_agent.sinks.avroSink-1.hostname = 127.0.0.1
> local_agent.sinks.avroSink-1.port = 4545
> local_agent.sinks.avroSink-1.channel = memoryChannel-1
>
> #
> # Properties for the avro sink agent to collector 2
> #
> local_agent.sinks.avroSink-2.type = avro
> local_agent.sinks.avroSink-2.hostname = 127.0.0.1
> local_agent.sinks.avroSink-2.port = 4546
> local_agent.sinks.avroSink-2.channel = memoryChannel-1
>
> #
> # Properties for the avro source collector 1
> #
> collector_agent_1.sources.avroSource-1.type = avro
> collector_agent_1.sources.avroSource-1.bind = 127.0.0.1
> collector_agent_1.sources.avroSource-1.port = 4545
> collector_agent_1.sources.avroSource-1.channels = memoryChannel-1
> memoryChannel-2
>
> #
> # Properties for the avro source collector 2
> #
> collector_agent_2.sources.avroSource-2.type = avro
> collector_agent_2.sources.avroSource-2.bind = 127.0.0.1
> collector_agent_2.sources.avroSource-2.port = 4546
> collector_agent_2.sources.avroSource-2.channels = memoryChannel-1
> memoryChannel-2
>
> # End points for collector 1
>
> # ElasticSearch endpoint collector 1
>
> collector_agent_1.sinks.elastic-search-sink-1.type =
> org.apache.flume.sink.elasticsearch.ElasticSearchSink
> collector_agent_1.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300
> collector_agent_1.sinks.elastic-search-sink-1.clusterName = elasticsearch
> collector_agent_1.sinks.elastic-search-sink-1.batchSize = 10
> collector_agent_1.sinks.elastic-search-sink-1.channel = memoryChannel-1
>
> # HDFS endpoint collector 1
>
> collector_agent_1.sinks.sink1.type = hdfs
> collector_agent_1.sinks.sink1.hdfs.path =
> hdfs://hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test
> collector_agent_1.sinks.sink1.hdfs.fileType = DataStream
> collector_agent_1.sinks.sink1.hdfs.rollInterval = 300
> collector_agent_1.sinks.sink1.hdfs.rollSize = 0
> collector_agent_1.sinks.sink1.hdfs.rollCount = 0
> collector_agent_1.sinks.sink1.hdfs.batchSize = 1000
> collector_agent_1.sinks.sink1.txnEventMax = 1000
> collector_agent_1.sinks.sink1.serializer = avro_event
> collector_agent_1.sinks.sink1.channel = memoryChannel-2
>
> # ElasticSearch endpoint collector 2
>
> collector_agent_2.sinks.elastic-search-sink-1.type =
> org.apache.flume.sink.elasticsearch.ElasticSearchSink
> collector_agent_2.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300
> collector_agent_2.sinks.elastic-search-sink-1.clusterName = elasticsearch
> collector_agent_2.sinks.elastic-search-sink-1.batchSize = 10
> collector_agent_2.sinks.elastic-search-sink-1.channel = memoryChannel-1
>
> # HDFS endpoint collector 2
>
> collector_agent_2.sinks.sink1.type = hdfs
> collector_agent_2.sinks.sink1.hdfs.path =
> hdfs://hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test_3
> collector_agent_2.sinks.sink1.hdfs.fileType = DataStream
> collector_agent_2.sinks.sink1.hdfs.rollInterval = 300
> collector_agent_2.sinks.sink1.hdfs.rollSize = 0
> collector_agent_2.sinks.sink1.hdfs.rollCount = 0
> collector_agent_2.sinks.sink1.hdfs.batchSize = 1000
> collector_agent_2.sinks.sink1.txnEventMax = 1000
> collector_agent_2.sinks.sink1.serializer = avro_event
> collector_agent_2.sinks.sink1.channel = memoryChannel-2
>
> # Specify priorities for the sinks on the agent
>
> local_agent.sinkgroups.ha.sinks = avroSink-1 avroSink-2
> local_agent.sinkgroups.ha.processor.type = failover
> local_agent.sinkgroups.ha.priority.avroSink-1 = 2
> local_agent.sinkgroups.ha.priority.avroSink-2 = 1
>
> # Wire the source agents up
>
> local_agent.sources = spooldir-1
> local_agent.sinks = avroSink-1 avroSink-2
> local_agent.sinkgroups = ha
> local_agent.channels = memoryChannel-1
>
> # Wire the collector agents up
>
> collector_agent_1.sources = avroSource-1
> collector_agent_1.sinks = elastic-search-sink-1 sink1
> collector_agent_1.channels = memoryChannel-1 memoryChannel-2
>
> collector_agent_2.sources = avroSource-2
> collector_agent_2.sinks = elastic-search-sink-1 sink1
> collector_agent_2.channels = memoryChannel-1 memoryChannel-2
>
> I will get the following exceptions on the collector nodes
>
> org.apache.flume.ChannelException: Unable to put batch on required channel:
> org.apache.flume.channel.MemoryChannel{name: memoryChannel-1}
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
> at org.apache.avro.ipc.Responder.respond(Responder.java:149)
> at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
> at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
> at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
> at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
> at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
> at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
> at
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> at
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks are likely not keeping up with sources, or the
> buffer size is too tight
> at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:91)
> at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> ... 28 more
>
> then the following exceptions on the agent
>
> 2012-10-16 10:51:02,942 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> [WARN -
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)]
> Sink avroSink-1 failed and has been sent to failover list
> org.apache.flume.EventDeliveryException: Failed to send events
> at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
> at
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> host: localhost, port: 4545 }: Failed to send batch
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
> at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)
> ... 3 more
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> host: localhost, port: 4545 }: Exception thrown from remote handler
> at
> org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:318)
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
> ... 4 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> NettyTransceiver closed
> at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
> at
> org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310)
> ... 6 more
> Caused by: java.io.IOException: NettyTransceiver closed
> at
> org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:338)
> at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:59)
> at
> org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:496)
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:232)
> at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:98)
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
> at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:404)
> at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:602)
> at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:358)
> at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
> at
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> at
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> ... 1 more
> 2012-10-16 10:51:04,040 (pool-4-thread-1) [INFO -
> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:201)]
> Preparing to move file /home/cgandevia/Desktop/flume_test/ready/test.log.27
> to /home/cgandevia/Desktop/flume_test/ready/test.log.27.COMPLETED
> 2012-10-16 10:51:07,119 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> [WARN -
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)]
> Sink avroSink-2 failed and has been sent to failover list
> org.apache.flume.EventDeliveryException: Failed to send events
> at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
> at
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> host: localhost, port: 4546 }: Failed to send batch
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
> at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)
> ... 3 more
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> host: localhost, port: 4546 }: Avro RPC call returned Status: FAILED
> at
> org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:312)
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
> at
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
> ... 4 more
> 2012-10-16 10:51:07,120 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]
> Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException: All sinks failed to process,
> nothing left to failover to
> at
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:191)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:662)
> 2012-10-16 10:51:12,120 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
> Avro sink avroSink-1: Building RpcClient with hostname: 127.0.0.1, port:
> 4545
> 2012-10-16 10:51:12,188 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
> Avro sink avroSink-2: Building RpcClient with hostname: 127.0.0.1, port:
> 4546
>
> Once this happens flume never resumes.
>



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Mime
View raw message