flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juhani Connolly <juhani_conno...@cyberagent.co.jp>
Subject Re: Flume configuration fail-over problems
Date Thu, 18 Oct 2012 08:10:20 GMT
Hi Cameron, I had a look at your logs and what would be happening:

- Looks like your first collector either isn't sinking events, or it's 
just too slow at it.
- By the look of the logs the second one *is* delivering events. The 
failover processor doesn't actually say anything if nothing is wrong. 
Perhaps adding some logging to 
FailoverSinkProcessor.moveActiveToDeadAndGetNext() would make this more 
user friendly. Could you check if stuff was actually delivered by 
collector 2?
- The json output for avro-sink2 looks like it is successfully sending 
data. What's the output like for the second collector?
- I'd recommend setting up ganglia to verify the throughput on each 
component to see if there's an imbalance. Those JSON stats quite frankly 
look a bit odd. It doesn't look like the channels are full, which would 
indicate that the batch size is greater than the transaction capacity. 
But this doesn't appear to be the case in the configuration, unless we 
have a bug expecting batchsize < transactionCap rather than <=. It seems 
some batch sizes aren't specified(so using default. I'm pretty sure none 
of the defaults are > 1000 but you might want to check that just to be safe

So if this isn't resolved: Check to see if collector 2 is receiving 
events, if not, try chancing transaction size to be greater than the 
batch size. If there's still a problem, thread dumps from all three 
agents would help a lot, and if you can, have a look at the stats from 
ganglia.

On 10/17/2012 04:53 AM, Cameron Gandevia wrote:
> Hey
>
> Thanks for the reply. I think the problem is in the Spooling Directory 
> Source. I am not sure it recovers properly, once the failure happens I 
> don't see its thread pool running in the dump. I have added a question 
> to the code review which may be related but I don't understand how the 
> sources are restarted during failures to know if its the problem for 
> sure. I will keep looking and post anything I find.
>
> Thanks
>
> On Tue, Oct 16, 2012 at 12:45 PM, Brock Noland <brock@cloudera.com 
> <mailto:brock@cloudera.com>> wrote:
>
>     With the configuration below, I was able to kill collector1, see
>     collector2 take over and reverse.
>
>     Note that I had to decrease the capacity of the channels significantly
>     because I was using a smaller heap size. Before decreasing the
>     capacity, a few threads in the local_agent ran of memory and then
>     behaved like you explained.
>
>     Brock
>
>     #
>     # Properties of memoryChannel
>     #
>     local_agent.channels.memoryChannel-1.type = memory
>     local_agent.channels.memoryChannel-1.capacity = 100000
>     local_agent.channels.memoryChannel-1.transactionCapacity = 1000
>
>     collector_agent_1.channels.memoryChannel-1.type = memory
>     collector_agent_1.channels.memoryChannel-1.capacity = 100000
>     collector_agent_1.channels.memoryChannel-1.transactionCapacity = 1000
>
>     collector_agent_1.channels.memoryChannel-2.type = memory
>     collector_agent_1.channels.memoryChannel-2.capacity = 100000
>     collector_agent_1.channels.memoryChannel-2.transactionCapacity = 1000
>
>     collector_agent_2.channels.memoryChannel-1.type = memory
>     collector_agent_2.channels.memoryChannel-1.capacity = 100000
>     collector_agent_2.channels.memoryChannel-1.transactionCapacity = 1000
>
>     collector_agent_2.channels.memoryChannel-2.type = memory
>     collector_agent_2.channels.memoryChannel-2.capacity = 100000
>     collector_agent_2.channels.memoryChannel-2.transactionCapacity = 1000
>
>     #
>     # Properties for spooling directory source
>     #
>     local_agent.sources.spooldir-1.type = seq
>     local_agent.sources.spooldir-1.channels = memoryChannel-1
>
>     #
>     # Properties for the avro sink 1 agent to collector 1
>     #
>     local_agent.sinks.avroSink-1.type = avro
>     local_agent.sinks.avroSink-1.hostname = 127.0.0.1
>     local_agent.sinks.avroSink-1.port = 4545
>     local_agent.sinks.avroSink-1.channel = memoryChannel-1
>
>     #
>     # Properties for the avro sink agent to collector 2
>     #
>     local_agent.sinks.avroSink-2.type = avro
>     local_agent.sinks.avroSink-2.hostname = 127.0.0.1
>     local_agent.sinks.avroSink-2.port = 4546
>     local_agent.sinks.avroSink-2.channel = memoryChannel-1
>
>     #
>     # Properties for the avro source collector 1
>     #
>     collector_agent_1.sources.avroSource-1.type = avro
>     collector_agent_1.sources.avroSource-1.bind = 127.0.0.1
>     collector_agent_1.sources.avroSource-1.port = 4545
>     collector_agent_1.sources.avroSource-1.channels = memoryChannel-1
>     memoryChannel-2
>
>     #
>     # Properties for the avro source collector 2
>     #
>     collector_agent_2.sources.avroSource-2.type = avro
>     collector_agent_2.sources.avroSource-2.bind = 127.0.0.1
>     collector_agent_2.sources.avroSource-2.port = 4546
>     collector_agent_2.sources.avroSource-2.channels = memoryChannel-1
>     memoryChannel-2
>
>     # End points for collector 1
>
>     # ElasticSearch endpoint collector 1
>
>     collector_agent_1.sinks.elastic-search-sink-1.type = null
>     collector_agent_1.sinks.elastic-search-sink-1.channel =
>     memoryChannel-1
>
>     # HDFS endpoint collector 1
>
>     collector_agent_1.sinks.sink1.type = null
>     collector_agent_1.sinks.sink1.channel = memoryChannel-2
>
>     # ElasticSearch endpoint collector 2
>
>     collector_agent_2.sinks.elastic-search-sink-1.type = null
>     collector_agent_2.sinks.elastic-search-sink-1.channel =
>     memoryChannel-1
>
>     # HDFS endpoint collector 2
>
>     collector_agent_2.sinks.sink1.type = null
>     collector_agent_2.sinks.sink1.channel = memoryChannel-2
>
>     # Specify priorities for the sinks on the agent
>
>     local_agent.sinkgroups.ha.sinks = avroSink-1 avroSink-2
>     local_agent.sinkgroups.ha.processor.type = failover
>     local_agent.sinkgroups.ha.priority.avroSink-1 = 2
>     local_agent.sinkgroups.ha.priority.avroSink-2 = 1
>
>     # Wire the source agents up
>
>     local_agent.sources = spooldir-1
>     local_agent.sinks = avroSink-1 avroSink-2
>     local_agent.sinkgroups = ha
>     local_agent.channels = memoryChannel-1
>
>     # Wire the collector agents up
>
>     collector_agent_1.sources = avroSource-1
>     collector_agent_1.sinks = elastic-search-sink-1 sink1
>     collector_agent_1.channels = memoryChannel-1 memoryChannel-2
>
>     collector_agent_2.sources = avroSource-2
>     collector_agent_2.sinks = elastic-search-sink-1 sink1
>     collector_agent_2.channels = memoryChannel-1 memoryChannel-2
>
>
>
>
> -- 
> Thanks
>
> Cameron Gandevia


Mime
View raw message