flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shushuai zhu <ss...@yahoo.com>
Subject Re: ElasticSearchSink does not work
Date Wed, 12 Jun 2013 14:35:25 GMT
Allan, thanks for the reply. In my case, I only used one channel and one sink at the same time.
About 10 minutes after the data were sent to the Flume agent, some messages were logged in
flume.log (see below). It says class org/elasticsearch/common/transport/TransportAddress was
not found. This seems indicating that the Cloudera version of Flume does not support ElasticSearchSink.
Anyway to add the missing class or some jar file?
I also tried to download the flume from Flume site:
But the downloaded apache-flume-1.3.1-bin.tar.gz is complained as not a gzip file nor a tar
file on my Linux box (Red Hat 5). Can anyone let me know the exact downloading process? If
possible, please provide some step-by-step instruction for downloading and installation.
11 Jun 2013 19:40:37,082 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:61) 
- Configuration provider starting
11 Jun 2013 19:40:37,114 INFO  [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:133) 
- Reloading configuration file:conf/flume.conf
11 Jun 2013 19:40:37,121 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
- Processing:k1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:930) 
- Added sinks: k1 Agent: agent1
11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
- Processing:k1
11 Jun 2013 19:40:37,123 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
- Processing:k1
11 Jun 2013 19:40:37,123 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016) 
- Processing:k1
11 Jun 2013 19:40:37,457 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration.validateConfiguration:140) 
- Post-validation flume configuration contains configuration for agents: [agent1]
11 Jun 2013 19:40:37,457 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:150) 
- Creating channels
11 Jun 2013 19:40:37,464 INFO  [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:40) 
- Creating instance of channel ch1 type memory
11 Jun 2013 19:40:37,468 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:205) 
- Created channel ch1
11 Jun 2013 19:40:37,469 INFO  [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:39) 
- Creating instance of source avro-source1, type avro
11 Jun 2013 19:40:37,484 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:40) 
- Creating instance of sink: k1, type: org.apache.flume.sink.elasticsearch.ElasticSearchSink
11 Jun 2013 19:40:37,489 ERROR [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:145) 
- Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: org/elasticsearch/common/transport/TransportAddress
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:186)
        at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67)
        at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.transport.TransportAddress
        at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
        ... 15 more

 From: Allan Feid <allanfeid@gmail.com>
To: user@flume.apache.org; shushuai zhu <sszhu@yahoo.com> 
Sent: Wednesday, June 12, 2013 10:09 AM
Subject: Re: ElasticSearchSink does not work

Hi Shushuai,

I've had a similar issue, and in my case it was because I was using the same channel for multiple
sinks. I believe what happens is whatever sink can remove the event from the queue first will
have it written out, but I don't know the specifics since I haven't had a chance to read through
the codebase. If you add a second channel for your elasticsearch sink and make sure your avro-source
writes to both channels, you should see data going to both locations. 


On Tue, Jun 11, 2013 at 10:37 PM, shushuai zhu <sszhu@yahoo.com> wrote:

>I am new to Flume. I am trying to send data using Flume Client perl API to Flume Avro
source then ElasticSearchSink to an ElasticSearch engine. I could make the file_roll sink
to work by sending the data to some file. However, I am encountering issue with ElasticSearchSink.
The data did not go through to ElasticSearch engine: 
>use Flume::Client;
>my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', port
=> 41414);
>my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => $ng_client);
>my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => {},
body => "hello, this is sent from perl (using FlumeNG)"}]);
>print "$response\n";    # response will be 'OK'
 on success
>since the returned $response is not defined (again this worked when file_roll sink was
>The ElasticSearch engine is working since I could load data to it via LogStash's EalsticSearch
output type. 
>The Flume agent was installed by downloading tarball from Cloudera:
>The flume.conf is as followings. I played around the "hostNames" with full name, IP address,
etc. Do not see output message in flume.log. Could someone let me know what could potentially
cause the issue? 
># Define a memory channel called ch1 on agent1
>agent1.channels = ch1
>agent1.channels.ch1.type = memory
># Define an Avro source called avro-source1 on agent1 and tell it to bind to
Connect it to channel ch1.
>agent1.sources = avro-source1
>agent1.sources.avro-source1.channels = ch1
>agent1.sources.avro-source1.type = avro
>agent1.sources.avro-source1.bind =
>agent1.sources.avro-source1.port = 41414
># Define a local file sink that simply logs all events it receives (this works well)
>#agent1.sinks = localout
>#agent1.sinks.localout.type = file_roll
>#agent1.sinks.localout.sink.directory = /scratch/flume-ng/log
>#agent1.sinks.localout.sink.rollInterval = 0
>#agent1.sinks.localout.channel = ch1
># Define ElasticSearchSink sink (this does not work)
>agent1.sinks = k1
>agent1.sinks.k1.type =
>agent1.sinks.k1.hostNames = localhost:9300
>agent1.sinks.k1.indexName = flume
>agent1.sinks.k1.indexType = logs
>agent1.sinks.k1.clusterName = elasticsearch
>agent1.sinks.k1.batchSize = 2
>agent1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
>agent1.sinks.k1.channel = ch1
View raw message