flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kumar, Suresh" <Suresh.Kum...@emc.com>
Subject RE: Flume Source and Sink in different hosts
Date Thu, 04 Oct 2012 22:19:04 GMT
Yes, my HBase has the table and column family, if I run the /etc/passwd test using flume-ng
client, the table

gets populated.

 

Here is the log from the source agent, there is nothing much in the sink except for which
seem to benign.


Thanks,

Suresh

 

2012-10-04 14:59:05,622 (lifecycleSupervisor-1-0-SendThread(localhost:2181)) [DEBUG - org.apache.zookeeper.client.ZooKeeperSaslClient.clientTunneledAuthenticationInProgress(ZooKeeperSaslClient.java:515)]
Could not retrieve login configuration: java.lang.SecurityException: Unable to locate a login
configuration

2012-10-04 14:59:08,414 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf for changes

 

source agent log:

 

 

$ bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n
agent3

 

+ exec /usr/lib/jvm/java-6-sun/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/opt/flume/conf:/opt/flume/lib/*'
-Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n agent3

2012-10-04 15:09:30,778 (main) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 1

2012-10-04 15:09:30,791 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)]
Flume node starting - agent3

2012-10-04 15:09:30,799 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:203)]
Node manager starting

2012-10-04 15:09:30,801 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:67)]
Configuration provider starting

2012-10-04 15:09:30,810 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 10

2012-10-04 15:09:30,813 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:207)]
Node manager started

2012-10-04 15:09:30,819 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:86)]
Configuration provider started

2012-10-04 15:09:30,819 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf for changes

2012-10-04 15:09:30,821 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:195)]
Reloading configuration file:conf/flume.conf

2012-10-04 15:09:30,839 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:912)]
Added sinks: avro-sink Agent: agent3

2012-10-04 15:09:30,840 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

2012-10-04 15:09:30,840 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1002)]
Created context for avro-sink: hostname

2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

2012-10-04 15:09:30,841 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:295)]
Starting validation of configuration for agent: agent3, initial-configuration: AgentConfiguration[agent3]

SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=MemoryChannel-1,
type=exec} }}

CHANNELS: {MemoryChannel-1={ parameters:{type=memory} }}

SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=MemoryChannel-1}
}}

 

2012-10-04 15:09:30,854 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:450)]
Created channel MemoryChannel-1

2012-10-04 15:09:30,883 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:655)]
Creating sink: avro-sink using AVRO

2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:353)]
Post validation configuration for agent3

AgentConfiguration created without Configuration stubs for which only basic syntactical validation
was performed[agent3]

SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=MemoryChannel-1,
type=exec} }}

CHANNELS: {MemoryChannel-1={ parameters:{type=memory} }}

SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=MemoryChannel-1}
}}

 

2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:117)]
Channels:MemoryChannel-1

 

2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:118)]
Sinks avro-sink

 

2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:119)]
Sources tail

 

2012-10-04 15:09:30,885 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:122)]
Post-validation flume configuration contains configuration  for agents: [agent3]

2012-10-04 15:09:30,886 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:249)]
Creating channels

2012-10-04 15:09:30,886 (conf-file-poller-0) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:68)]
Creating instance of channel MemoryChannel-1 type memory

2012-10-04 15:09:31,013 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: CHANNEL, name: MemoryChannel-1, registered successfully.

2012-10-04 15:09:31,013 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:273)]
created channel MemoryChannel-1

2012-10-04 15:09:31,014 (conf-file-poller-0) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:74)]
Creating instance of source tail, type exec

2012-10-04 15:09:31,037 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:70)]
Creating instance of sink: avro-sink, type: avro

2012-10-04 15:09:31,045 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: SINK, name: avro-sink, registered successfully.

2012-10-04 15:09:31,046 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:106)]
Starting new configuration:{ sourceRunners:{tail=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:tail,state:IDLE}
}} sinkRunners:{avro-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@e949f69
counterGroup:{ name:null counters:{} } }} channels:{MemoryChannel-1=org.apache.flume.channel.MemoryChannel{name:
MemoryChannel-1}} }

2012-10-04 15:09:31,046 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:113)]
Starting Channel MemoryChannel-1

2012-10-04 15:09:31,049 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:128)]
Waiting for channel: MemoryChannel-1 to start. Sleeping for 500 ms

2012-10-04 15:09:31,052 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: CHANNEL, name: MemoryChannel-1 started

2012-10-04 15:09:31,550 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:141)]
Starting Sink avro-sink

2012-10-04 15:09:31,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:239)]
Starting AvroSink avro-sink { host: sig-flume, port: 41414 }...

2012-10-04 15:09:31,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: SINK, name: avro-sink started

2012-10-04 15:09:31,552 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:152)]
Starting Source tail

2012-10-04 15:09:31,554 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
Avro sink avro-sink: Building RpcClient with hostname: sig-flume, port: 41414

2012-10-04 15:09:31,561 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)]
Exec source starting with command:tail -F /var/log/auth.log

2012-10-04 15:09:31,586 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.source.ExecSource.start(ExecSource.java:173)]
Exec source started

2012-10-04 15:09:31,626 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:420)]
Batch size string = null

2012-10-04 15:09:32,684 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:192)]
Avro sink avro-sink: Created RpcClient: NettyAvroRpcClient { host: sig-flume, port: 41414
}

2012-10-04 15:09:32,685 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:253)]
Avro sink avro-sink started.

2012-10-04 15:09:32,689 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)]
Polling sink runner starting

2012-10-04 15:10:01,565 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf for changes

2012-10-04 15:10:31,567 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(Abstr

 

 

From: Hari Shreedharan [mailto:hshreedharan@cloudera.com] 
Sent: Thursday, October 04, 2012 3:02 PM
To: user@flume.apache.org
Subject: Re: Flume Source and Sink in different hosts

 

Can you send the logs also, of both agents? Does your Hbase cluster have the said column family
and table with that family? 

 

Also are you sure the files are not getting rotated out. You should use tail -F so that your
code works even with files getting rotated out.

 

 

Hari

-- 

Hari Shreedharan

 

On Thursday, October 4, 2012 at 2:53 PM, Kumar, Suresh wrote:

	Hello:

	 

	I have just downloaded and build flume-ng (apache-flume-1.3.0-SNAPSHOT).

	 

	My goal is to collect log data from HostA (source) and send it to HostB(sink), my initial
test (sending /etc/passwd) 

	from HostA to HostB worked fine, I was also able to load the passwd file into my HBase in
HostB.

	 

	Now, I want to load a continuous stream of log data (using tail –f), but I was not able
to replicate the above process.

	Flume just started fine in HostA, but I do not see any data being received by HostB or in
my HBase.

	 

	What is wrong with my configuration?

	 

	Thanks,

	Suresh

	 

	Here is my flume.conf in HostA

	 

	agent3.sources = tail

	agent3.channels = MemoryChannel-1

	agent3.sinks = avro-sink

	 

	# Define source flow

	agent3.sources.tail.type = exec

	agent3.sources.tail.command = tail -f /var/log/auth.log

	agent3.sources.tail.channels = MemoryChannel-1

	 

	# What kind of channel

	agent3.channels.MemoryChannel-1.type = memory

	 

	# avro sink properties

	agent3.sinks.avro-sink.type = avro

	agent3.sinks.avro-sink.channel = MemoryChannel-1

	agent3.sinks.avro-sink.hostname = hostb

	agent3.sinks.avro-sink.port = 41414

	 

	Here is my flume.conf in HostB

	 

	# Define a memory channel called ch1 on agent1

	agent1.channels.ch1.type = memory

	 

	# Define an Avro source called avro-source1 on agent1 and tell it

	# to bind to 0.0.0.0:41414. Connect it to channel ch1.

	agent1.sources.avro-source1.channels = ch1

	agent1.sources.avro-source1.type = avro

	agent1.sources.avro-source1.bind = 0.0.0.0

	agent1.sources.avro-source1.port = 41414

	 

	# Define a logger sink that simply logs all events it receives

	# and connect it to the other end of the same channel.

	agent1.sinks.log-sink1.channel = ch1

	agent1.sinks.log-sink1.type = logger

	 

	# Finally, now that we've defined all of our components, tell

	# agent1 which ones we want to activate.

	agent1.channels = ch1

	agent1.sources = avro-source1

	#agent1.sources = avro-source1

	agent1.sinks = sink1

	 

	agent1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink

	agent1.sinks.sink1.channel = ch1

	agent1.sinks.sink1.table = flumedemo

	agent1.sinks.sink1.columnFamily = testing

	agent1.sinks.sink1.column = foo

	agent1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

	agent1.sinks.sink1.serializer.payloadColumn = col1

	agent1.sinks.sink1.serializer.keyType = timestamp

	agent1.sinks.sink1.serializer.rowPrefix = 1

	agent1.sinks.sink1.serializer.suffix = timestamp

	agent1.sinks.sink1.serializer.payloadColumn = pcol

	agent1.sinks.sink1.serializer.incrementColumn = icol

	 

 

Mime
View raw message