Return-Path: X-Original-To: apmail-flume-user-archive@www.apache.org Delivered-To: apmail-flume-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB097D248 for ; Tue, 16 Oct 2012 18:38:14 +0000 (UTC) Received: (qmail 78560 invoked by uid 500); 16 Oct 2012 18:38:14 -0000 Delivered-To: apmail-flume-user-archive@flume.apache.org Received: (qmail 78512 invoked by uid 500); 16 Oct 2012 18:38:14 -0000 Mailing-List: contact user-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flume.apache.org Delivered-To: mailing list user@flume.apache.org Received: (qmail 78502 invoked by uid 99); 16 Oct 2012 18:38:14 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Oct 2012 18:38:14 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of brock@cloudera.com designates 209.85.212.51 as permitted sender) Received: from [209.85.212.51] (HELO mail-vb0-f51.google.com) (209.85.212.51) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Oct 2012 18:38:09 +0000 Received: by mail-vb0-f51.google.com with SMTP id fn1so6456062vbb.38 for ; Tue, 16 Oct 2012 11:37:28 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type:x-gm-message-state; bh=fJl5kMYs7YFp5W6lMDYWqs9GAvkZVLw2JyO/6thEeVE=; b=ETUHLZrH/Icr4++w4oAPB8+tpac6lypS6s9vO/qDZ09ri1Z9fKXx8wbRjK+xDxeUec rP9zBtSZvjHgS8kZTj2YizcK072AScjG+NXX2MuZZG/l5BG3s5rtCKcYARRzWgBDITKA BobMcLKcfPGFZtnyoSuZZJddACuP/R4MUkGzAxA+XHQ3MmhgSqapPMPWb5fxtYXMbH0j lpxHr+AGGLaVyE1ul1OdoFf3Y2dEP/gJmFfIaRPd3Dz71ZmjnBh1wlioEzfi4sO0UaGd PYc2VSNb5Jyz1jUfnLzAEW9JvpdXnoovkrvQFufP2QP1x7rGw2gOZKXR/iv77yF5KgHq XyNw== Received: by 10.220.154.68 with SMTP id n4mr9188928vcw.22.1350412645899; Tue, 16 Oct 2012 11:37:25 -0700 (PDT) MIME-Version: 1.0 Received: by 10.58.179.43 with HTTP; Tue, 16 Oct 2012 11:37:05 -0700 (PDT) In-Reply-To: References: From: Brock Noland Date: Tue, 16 Oct 2012 13:37:05 -0500 Message-ID: Subject: Re: Flume configuration fail-over problems To: user@flume.apache.org Content-Type: text/plain; charset=ISO-8859-1 X-Gm-Message-State: ALoCoQkN1xNA5LQCdneXVJp8DDlajqJTSrB0xh7ncaepEMygmEOyNTX0czJwuKj4wN3pd6d5mrCp X-Virus-Checked: Checked by ClamAV on apache.org Hi, Can you confirm that ElasticSearchSink is consuming events? > Once this happens flume never resumes. It doesn't bring anything after that? Even minutes later it's still doing nothing? brock On Tue, Oct 16, 2012 at 1:23 PM, Cameron Gandevia wrote: > Hi > > I am trying to setup a configuration where a single source agent load > balances between two aggregate agents which multiplex the flow to two end > points. I don't think I have the channel capacity properly configured but > flume always seems to end up hanging for me. If all channels are at capacity > should the source try and send again once the they have emptied? > > Here is my configuration. > > # > # Properties of memoryChannel > # > local_agent.channels.memoryChannel-1.type = memory > local_agent.channels.memoryChannel-1.capacity = 100000 > local_agent.channels.memoryChannel-1.transactionCapacity = 1000 > > collector_agent_1.channels.memoryChannel-1.type = memory > collector_agent_1.channels.memoryChannel-1.capacity = 100000 > collector_agent_1.channels.memoryChannel-1.transactionCapacity = 1000 > > collector_agent_1.channels.memoryChannel-2.type = memory > collector_agent_1.channels.memoryChannel-2.capacity = 100000 > collector_agent_1.channels.memoryChannel-2.transactionCapacity = 1000 > > collector_agent_2.channels.memoryChannel-1.type = memory > collector_agent_2.channels.memoryChannel-1.capacity = 100000 > collector_agent_2.channels.memoryChannel-1.transactionCapacity = 1000 > > collector_agent_2.channels.memoryChannel-2.type = memory > collector_agent_2.channels.memoryChannel-2.capacity = 100000 > collector_agent_2.channels.memoryChannel-2.transactionCapacity = 1000 > > # > # Properties for spooling directory source > # > local_agent.sources.spooldir-1.type = spooldir > local_agent.sources.spooldir-1.spoolDir = ~/flume_test/ready > local_agent.sources.spooldir-1.fileHeader = true > local_agent.sources.spooldir-1.channels = memoryChannel-1 > > # > # Properties for the avro sink 1 agent to collector 1 > # > local_agent.sinks.avroSink-1.type = avro > local_agent.sinks.avroSink-1.hostname = 127.0.0.1 > local_agent.sinks.avroSink-1.port = 4545 > local_agent.sinks.avroSink-1.channel = memoryChannel-1 > > # > # Properties for the avro sink agent to collector 2 > # > local_agent.sinks.avroSink-2.type = avro > local_agent.sinks.avroSink-2.hostname = 127.0.0.1 > local_agent.sinks.avroSink-2.port = 4546 > local_agent.sinks.avroSink-2.channel = memoryChannel-1 > > # > # Properties for the avro source collector 1 > # > collector_agent_1.sources.avroSource-1.type = avro > collector_agent_1.sources.avroSource-1.bind = 127.0.0.1 > collector_agent_1.sources.avroSource-1.port = 4545 > collector_agent_1.sources.avroSource-1.channels = memoryChannel-1 > memoryChannel-2 > > # > # Properties for the avro source collector 2 > # > collector_agent_2.sources.avroSource-2.type = avro > collector_agent_2.sources.avroSource-2.bind = 127.0.0.1 > collector_agent_2.sources.avroSource-2.port = 4546 > collector_agent_2.sources.avroSource-2.channels = memoryChannel-1 > memoryChannel-2 > > # End points for collector 1 > > # ElasticSearch endpoint collector 1 > > collector_agent_1.sinks.elastic-search-sink-1.type = > org.apache.flume.sink.elasticsearch.ElasticSearchSink > collector_agent_1.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300 > collector_agent_1.sinks.elastic-search-sink-1.clusterName = elasticsearch > collector_agent_1.sinks.elastic-search-sink-1.batchSize = 10 > collector_agent_1.sinks.elastic-search-sink-1.channel = memoryChannel-1 > > # HDFS endpoint collector 1 > > collector_agent_1.sinks.sink1.type = hdfs > collector_agent_1.sinks.sink1.hdfs.path = > hdfs://hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test > collector_agent_1.sinks.sink1.hdfs.fileType = DataStream > collector_agent_1.sinks.sink1.hdfs.rollInterval = 300 > collector_agent_1.sinks.sink1.hdfs.rollSize = 0 > collector_agent_1.sinks.sink1.hdfs.rollCount = 0 > collector_agent_1.sinks.sink1.hdfs.batchSize = 1000 > collector_agent_1.sinks.sink1.txnEventMax = 1000 > collector_agent_1.sinks.sink1.serializer = avro_event > collector_agent_1.sinks.sink1.channel = memoryChannel-2 > > # ElasticSearch endpoint collector 2 > > collector_agent_2.sinks.elastic-search-sink-1.type = > org.apache.flume.sink.elasticsearch.ElasticSearchSink > collector_agent_2.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300 > collector_agent_2.sinks.elastic-search-sink-1.clusterName = elasticsearch > collector_agent_2.sinks.elastic-search-sink-1.batchSize = 10 > collector_agent_2.sinks.elastic-search-sink-1.channel = memoryChannel-1 > > # HDFS endpoint collector 2 > > collector_agent_2.sinks.sink1.type = hdfs > collector_agent_2.sinks.sink1.hdfs.path = > hdfs://hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test_3 > collector_agent_2.sinks.sink1.hdfs.fileType = DataStream > collector_agent_2.sinks.sink1.hdfs.rollInterval = 300 > collector_agent_2.sinks.sink1.hdfs.rollSize = 0 > collector_agent_2.sinks.sink1.hdfs.rollCount = 0 > collector_agent_2.sinks.sink1.hdfs.batchSize = 1000 > collector_agent_2.sinks.sink1.txnEventMax = 1000 > collector_agent_2.sinks.sink1.serializer = avro_event > collector_agent_2.sinks.sink1.channel = memoryChannel-2 > > # Specify priorities for the sinks on the agent > > local_agent.sinkgroups.ha.sinks = avroSink-1 avroSink-2 > local_agent.sinkgroups.ha.processor.type = failover > local_agent.sinkgroups.ha.priority.avroSink-1 = 2 > local_agent.sinkgroups.ha.priority.avroSink-2 = 1 > > # Wire the source agents up > > local_agent.sources = spooldir-1 > local_agent.sinks = avroSink-1 avroSink-2 > local_agent.sinkgroups = ha > local_agent.channels = memoryChannel-1 > > # Wire the collector agents up > > collector_agent_1.sources = avroSource-1 > collector_agent_1.sinks = elastic-search-sink-1 sink1 > collector_agent_1.channels = memoryChannel-1 memoryChannel-2 > > collector_agent_2.sources = avroSource-2 > collector_agent_2.sinks = elastic-search-sink-1 sink1 > collector_agent_2.channels = memoryChannel-1 memoryChannel-2 > > I will get the following exceptions on the collector nodes > > org.apache.flume.ChannelException: Unable to put batch on required channel: > org.apache.flume.channel.MemoryChannel{name: memoryChannel-1} > at > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) > at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259) > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88) > at org.apache.avro.ipc.Responder.respond(Responder.java:149) > at > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80) > at > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783) > at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302) > at > org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321) > at > org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299) > at > org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) > at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) > at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) > at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) > at > org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) > at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) > at > org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) > at > org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > Caused by: org.apache.flume.ChannelException: Space for commit to queue > couldn't be acquired Sinks are likely not keeping up with sources, or the > buffer size is too tight > at > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:91) > at > org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) > at > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) > ... 28 more > > then the following exceptions on the agent > > 2012-10-16 10:51:02,942 (SinkRunner-PollingRunner-FailoverSinkProcessor) > [WARN - > org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)] > Sink avroSink-1 failed and has been sent to failover list > org.apache.flume.EventDeliveryException: Failed to send events > at org.apache.flume.sink.AvroSink.process(AvroSink.java:325) > at > org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:662) > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { > host: localhost, port: 4545 }: Failed to send batch > at > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236) > at org.apache.flume.sink.AvroSink.process(AvroSink.java:309) > ... 3 more > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { > host: localhost, port: 4545 }: Exception thrown from remote handler > at > org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:318) > at > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295) > at > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224) > ... 4 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > NettyTransceiver closed > at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) > at > org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310) > ... 6 more > Caused by: java.io.IOException: NettyTransceiver closed > at > org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:338) > at org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:59) > at > org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:496) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783) > at > org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348) > at > org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:232) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:98) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) > at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:404) > at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:602) > at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:358) > at > org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) > at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) > at > org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) > at > org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > ... 1 more > 2012-10-16 10:51:04,040 (pool-4-thread-1) [INFO - > org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:201)] > Preparing to move file /home/cgandevia/Desktop/flume_test/ready/test.log.27 > to /home/cgandevia/Desktop/flume_test/ready/test.log.27.COMPLETED > 2012-10-16 10:51:07,119 (SinkRunner-PollingRunner-FailoverSinkProcessor) > [WARN - > org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)] > Sink avroSink-2 failed and has been sent to failover list > org.apache.flume.EventDeliveryException: Failed to send events > at org.apache.flume.sink.AvroSink.process(AvroSink.java:325) > at > org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:662) > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { > host: localhost, port: 4546 }: Failed to send batch > at > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236) > at org.apache.flume.sink.AvroSink.process(AvroSink.java:309) > ... 3 more > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { > host: localhost, port: 4546 }: Avro RPC call returned Status: FAILED > at > org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:312) > at > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295) > at > org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224) > ... 4 more > 2012-10-16 10:51:07,120 (SinkRunner-PollingRunner-FailoverSinkProcessor) > [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] > Unable to deliver event. Exception follows. > org.apache.flume.EventDeliveryException: All sinks failed to process, > nothing left to failover to > at > org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:191) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:662) > 2012-10-16 10:51:12,120 (SinkRunner-PollingRunner-FailoverSinkProcessor) > [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)] > Avro sink avroSink-1: Building RpcClient with hostname: 127.0.0.1, port: > 4545 > 2012-10-16 10:51:12,188 (SinkRunner-PollingRunner-FailoverSinkProcessor) > [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)] > Avro sink avroSink-2: Building RpcClient with hostname: 127.0.0.1, port: > 4546 > > Once this happens flume never resumes. > -- Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/