flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohit Durgapal <durgapalmo...@gmail.com>
Subject Re: issue with failover sinks in flume
Date Thu, 18 Sep 2014 05:14:44 GMT
Hi Hari,

This is our latest config:


agent1tier1.sources = tcpsrc
agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
avro-forward-ch02 avro-forward-ch02backup

agent1tier1.channels = channelbucket01 channelbucket02
agent1tier1.channels.channelbucket01.type = file
agent1tier1.channels.channelbucket02.type = file


agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
agent1tier1.sources.tcpsrc.type = syslogtcp
agent1tier1.sources.tcpsrc.port = 5149
agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
agent1tier1.sources.tcpsrc.interceptors=i1
agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp




#################### INTERCEPTOR ##############################
agent1tier1.sources.tcpsrc.interceptors=logsintercept
agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
bucket01,bucket02



#################### END OF INTERCEPTOR ##############################



####################### SELECTOR ###########################

agent1tier1.sources.tcpsrc.selector.type=multiplexing
agent1tier1.sources.tcpsrc.selector.header = bucket
agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
agent1tier1.sources.tcpsrc.selector.default = channelbucket01

##################### END OF SELECTOR #############################



#################### CHANNELS ##############################

agent1tier1.channels.channelbucket01.checkpointDir =
/home/flume/channelbucket01/file-channel/checkpoint
agent1tier1.channels.channelbucket01.dataDirs =
/home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data

agent1tier1.channels.channelbucket02.checkpointDir =
/home/flume/channelbucket02/file-channel/checkpoint
agent1tier1.channels.channelbucket02.dataDirs =
/home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data


#################### CHANNELS ##############################






################## CHANNELS CAPACITY ############################


agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
agent1tier1.channels.channelbucket01.checkpointInterval = 30000
agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
agent1tier1.channels.channelbucket01.capacity = 10000000

agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
agent1tier1.channels.channelbucket02.checkpointInterval = 30000
agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
agent1tier1.channels.channelbucket02.capacity = 10000000


################## END OF CHANNELS CAPACITY ############################



 # avro sink properties
agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
agent1tier1.sinks.avro-forward-ch01.type = avro
agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
agent1tier1.sinks.avro-forward-ch01.port = 10000

 # avro sink properties
agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
agent1tier1.sinks.avro-forward-ch01backup.type = avro
agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
agent1tier1.sinks.avro-forward-ch01backup.port = 19999

 # avro sink properties
agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
agent1tier1.sinks.avro-forward-ch02.type = avro
agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
agent1tier1.sinks.avro-forward-ch02.port = 19999

 # avro sink properties
agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
agent1tier1.sinks.avro-forward-ch02backup.type = avro
agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
agent1tier1.sinks.avro-forward-ch02backup.port = 10000



agent1tier1.sinkgroups = grpch1
agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
avro-forward-ch01backup
agent1tier1.sinkgroups.grpch1.processor.type = failover
#agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 10
#agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup =
2
agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000



agent1tier1.sinkgroups = grpch2
agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
avro-forward-ch02backup
agent1tier1.sinkgroups.grpch2.processor.type = failover
#agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 11
#agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup =
1
agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000


Regards
Mohit

On Thu, Sep 18, 2014 at 10:38 AM, Hari Shreedharan <
hshreedharan@cloudera.com> wrote:

> Can you send your latest config?
>
> Thanks,
> Hari
>
>
> On Tue, Sep 16, 2014 at 6:01 AM, Mohit Durgapal <durgapalmohit@gmail.com>
> wrote:
>
>>  We have a two stage topology in flume in which we are in the first tier
>> adding headers based on hash value of a field in the event.
>> The hashing logic is added in the interceptor in Tier 1 of flume topology
>> which basically sets a header field. And then we use multiplexing to direct
>> events to Tier 2  based on that header field through selector.
>>  In the second tier we are storing the events locally using file_roll and
>> storing the same events in hdfs also.
>>
>> Everything works fine when we are not using the failover sinks. When we
>> add the failover sink configuration in the first tier our hashing logic
>> gets overriden. That means even when all the machines in our Tier 2  are
>> active and running, some events which were meant for flume agent1(based on
>> hashing & multiplexing) go to agent 2.
>>
>> Also we are performing this test on three machines. One machine for Tier
>> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2.
>> In Tier 2 for flume agent on machine B, the machine C acts as the failover
>> backup and for flume agent on machine C, the machine B acts as the failover
>> backup.
>>
>> Any idea what could be wrong with this configuration?
>>
>> Below are the tier wise configurations:
>>
>> *Tier 1:*
>>
>> agent1tier1.sources = tcpsrc
>> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
>> avro-forward-ch02 avro-forward-ch02backup
>>
>> agent1tier1.channels = channelbucket01 channelbucket02
>> agent1tier1.channels.channelbucket01.type = file
>> agent1tier1.channels.channelbucket02.type = file
>>
>>
>> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
>> agent1tier1.sources.tcpsrc.type = syslogtcp
>> agent1tier1.sources.tcpsrc.port = 5149
>> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
>> agent1tier1.sources.tcpsrc.interceptors=i1
>> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>>
>>
>>
>>
>> #################### INTERCEPTOR ##############################
>> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>>
>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
>> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
>> bucket01,bucket02
>>
>>
>>
>> #################### END OF INTERCEPTOR ##############################
>>
>>
>>
>> ####################### SELECTOR ###########################
>>
>> agent1tier1.sources.tcpsrc.selector.type=multiplexing
>> agent1tier1.sources.tcpsrc.selector.header = bucket
>> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
>> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
>> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>>
>> ##################### END OF SELECTOR #############################
>>
>>
>>
>> #################### CHANNELS ##############################
>>
>> agent1tier1.channels.channelbucket01.checkpointDir =
>> /home/flume/channelbucket01/file-channel/checkpoint
>> agent1tier1.channels.channelbucket01.dataDirs =
>> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>>
>> agent1tier1.channels.channelbucket02.checkpointDir =
>> /home/flume/channelbucket02/file-channel/checkpoint
>> agent1tier1.channels.channelbucket02.dataDirs =
>> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>>
>>
>> #################### CHANNELS ##############################
>>
>>
>>
>>
>>
>>
>> ################## CHANNELS CAPACITY ############################
>>
>>
>> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
>> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
>> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
>> agent1tier1.channels.channelbucket01.capacity = 10000000
>>
>> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
>> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
>> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
>> agent1tier1.channels.channelbucket02.capacity = 10000000
>>
>>
>> ################## END OF CHANNELS CAPACITY ############################
>>
>>
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
>> agent1tier1.sinks.avro-forward-ch01.type = avro
>> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
>> agent1tier1.sinks.avro-forward-ch01.port = 10000
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
>> agent1tier1.sinks.avro-forward-ch01backup.type = avro
>> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
>> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
>> agent1tier1.sinks.avro-forward-ch02.type = avro
>> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
>> agent1tier1.sinks.avro-forward-ch02.port = 19999
>>
>>  # avro sink properties
>> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
>> agent1tier1.sinks.avro-forward-ch02backup.type = avro
>> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
>> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>>
>>
>>
>> agent1tier1.sinkgroups = grpch1
>> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
>> avro-forward-ch01backup
>> agent1tier1.sinkgroups.grpch1.processor.type = failover
>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
>> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
>> = 10
>> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>>
>>
>>
>> agent1tier1.sinkgroups = grpch2
>> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
>> avro-forward-ch02backup
>> agent1tier1.sinkgroups.grpch2.processor.type = failover
>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
>> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
>> = 11
>> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>>
>>
>>
>> *Tier 2:*
>>
>> tier2.sources  = avro-AppSrv-source
>> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
>> tier2.channels = channelconv channelimp channelclk channelrt
>> channelhdfsrt channelhdfsdel
>> tier2.channels.channelimp.type = file
>> tier2.channels.channelconv.type = file
>> tier2.channels.channelclk.type = file
>> tier2.channels.channelrt.type = file
>> tier2.channels.channelhdfsrt.type = file
>> tier2.channels.channelhdfsdel.type = file
>>
>> # For each source, channel, and sink, set
>> # standard properties.
>> # properties of avro-AppSrv-source
>> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
>> channelclk channelrt channelhdfsrt channelhdfsdel
>> tier2.sources.avro-AppSrv-source.type = avro
>> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
>> tier2.sources.avro-AppSrv-source.port = 10000
>>
>>
>>
>>
>>
>>
>> tier2.sources.avro-AppSrv-source.selector.type=multiplexing
>> tier2.sources.avro-AppSrv-source.selector.header = rectype
>> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
>> channelhdfsdel
>> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
>> channelhdfsdel
>> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
>> channelhdfsdel
>>
>> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
>> channelhdfsrt
>>
>>
>> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel
>>
>>
>>
>> tier2.sinks.impsink.type = file_roll
>> tier2.sinks.impsink.channel = channelimp
>> tier2.sinks.impsink.sink.directory = /var/log/flume/imp
>> tier2.sinks.impsink.sink.rollInterval=60
>>
>> tier2.sinks.convsink.type = file_roll
>> tier2.sinks.convsink.channel = channelconv
>> tier2.sinks.convsink.sink.directory = /var/log/flume/conv
>> tier2.sinks.convsink.sink.rollInterval=60
>>
>> tier2.sinks.clksink.type = file_roll
>> tier2.sinks.clksink.channel = channelclk
>> tier2.sinks.clksink.sink.directory = /var/log/flume/clk
>> tier2.sinks.clksink.sink.rollInterval=60
>>
>>
>> tier2.sinks.rtsink.type = file_roll
>> tier2.sinks.rtsink.channel = channelrt
>> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
>> tier2.sinks.rtsink.sink.rollInterval=60
>>
>>
>> #################### CHANNELS ##############################
>>
>> tier2.channels.channelimp.checkpointDir =
>> /home/flume/channelimp/file-channel/checkpoint
>> tier2.channels.channelimp.dataDirs =
>> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data
>>
>>
>> tier2.channels.channelclk.checkpointDir =
>> /home/flume/channelclk/file-channel/checkpoint
>> tier2.channels.channelclk.dataDirs =
>> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data
>>
>> tier2.channels.channelconv.checkpointDir =
>> /home/flume/channelconv/file-channel/checkpoint
>> tier2.channels.channelconv.dataDirs =
>> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data
>>
>> tier2.channels.channelrt.checkpointDir =
>> /home/flume/channelrt/file-channel/checkpoint
>> tier2.channels.channelrt.dataDirs =
>> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data
>>
>> tier2.channels.channelhdfsrt.checkpointDir =
>> /home/flume/channelhdfsrt/file-channel/checkpoint
>> tier2.channels.channelhdfsrt.dataDirs =
>> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data
>>
>> tier2.channels.channelhdfsdel.checkpointDir =
>> /home/flume/channelhdfsdel/file-channel/checkpoint
>> tier2.channels.channelhdfsdel.dataDirs =
>> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data
>>
>>
>>
>>
>> #################### CHANNELS ##############################
>>
>>
>> tier2.sinks.hdfssinkrt.type = hdfs
>> tier2.sinks.hdfssinkrt.channel = channelhdfsrt
>> tier2.sinks.hdfssinkrt.hdfs.path =
>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
>> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
>> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
>> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
>> # Roll based on the block size only
>> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
>> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
>> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
>> # seconds to wait before closing the file.
>> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
>> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
>> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
>> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
>> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
>> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000
>>
>>
>> tier2.sinks.hdfssinkdel.type = hdfs
>> tier2.sinks.hdfssinkdel.channel = channelhdfsdel
>> tier2.sinks.hdfssinkdel.hdfs.path =
>> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
>> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
>> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
>> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
>> # Roll based on the block size only
>> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
>> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
>> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
>> # seconds to wait before closing the file.
>> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
>> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
>> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
>> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
>> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
>> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
>> #################### END OF SINKS ##############################
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

Mime
View raw message