flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Martin, Ray" <arma...@tycho.ncsc.mil>
Subject RE: accumulo sink
Date Tue, 10 Jul 2012 20:01:46 GMT
I created a custom flume sink.

Added appropriate jars into lib director of flume-ng.

Set classpath in conf/flume-env.sh

Configured conf/flume.conf as follows:

# Sources, channels and sinks are defined per agent

 

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

 

# Define an Avro source called avro-source1 on agent1 and tell it

# to bind to 0.0.0.0:41414. Connect it to channel ch1.

agent1.sources.avro-source1.channels = ch1

agent1.sources.avro-source1.type = avro

agent1.sources.avro-source1.bind = 0.0.0.0

agent1.sources.avro-source1.port = 41414

 

# Define a logger sink that simply logs all events it receives

# and connect it to the other end of the same channel.

agent1.sinks.accumulo-sink1.channel = ch1

agent1.sinks.accumulo-sink1.type = flumeSink.AccumuloSink

 

# Finally, now that we've defined all of our components, tell

# agent1 which ones we want to activate.

agent1.channels = ch1

agent1.sources = avro-source1

agent1.sinks = accumulo-sink1

 

ran flume and get nullpointer:

./bin/flume-ng node --conf conf/ -f conf/flume.conf -n agent1

2012-07-10 15:55:38,990 (main) [INFO -
org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor
.java:58)] Starting lifecycle supervisor 1

2012-07-10 15:55:38,994 (main) [INFO -
org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)] Flume node
starting - agent1

2012-07-10 15:55:39,001 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(Defaul
tLogicalNodeManager.java:110)] Node manager starting

2012-07-10 15:55:39,001 (lifecycleSupervisor-1-1) [INFO -
org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(Abstr
actFileConfigurationProvider.java:67)] Configuration provider starting

2012-07-10 15:55:39,004 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor
.java:58)] Starting lifecycle supervisor 9

2012-07-10 15:55:39,010 (lifecycleSupervisor-1-0) [DEBUG -
org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(Defaul
tLogicalNodeManager.java:114)] Node manager started

2012-07-10 15:55:39,010 (lifecycleSupervisor-1-1) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(Abstr
actFileConfigurationProvider.java:87)] Configuration provider started

2012-07-10 15:55:39,010 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:189)] Checking
file:conf/flume.conf for changes

2012-07-10 15:55:39,015 (conf-file-poller-0) [INFO -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:196)] Reloading
configuration file:conf/flume.conf

2012-07-10 15:55:39,023 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.properties.FlumeConfiguration$AgentConfiguration.i
sValid(FlumeConfiguration.java:225)] Starting validation of
configuration for agent: agent1, initial-configuration:
AgentConfiguration[agent1]

SOURCES: {avro-source1=ComponentConfiguration[avro-source1]

  CONFIG: {port=41414, channels=ch1, type=avro, bind=0.0.0.0}

  RUNNER:   ComponentConfiguration[runner]

    CONFIG: {}

    

 

}

CHANNELS: {ch1=ComponentConfiguration[ch1]

  CONFIG: {type=memory}

  

}

SINKS: {accumulo-sink1=ComponentConfiguration[accumulo-sink1]

  CONFIG: {type=flumeSink.AccumuloSink, channel=ch1}

  RUNNER:   ComponentConfiguration[runner]

    CONFIG: {}

    

 

}

2012-07-10 15:55:39,024 (conf-file-poller-0) [INFO -
org.apache.flume.conf.properties.FlumeConfiguration.validateConfiguratio
n(FlumeConfiguration.java:119)] Post-validation flume configuration
contains configuation  for agents: [agent1]

2012-07-10 15:55:39,026 (conf-file-poller-0) [DEBUG -
org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFact
ory.java:67)] Creating instance of channel ch1 type memory

2012-07-10 15:55:39,037 (conf-file-poller-0) [DEBUG -
org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory
.java:73)] Creating instance of source avro-source1, type avro

2012-07-10 15:55:39,057 (conf-file-poller-0) [INFO -
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
69)] Creating instance of sink accumulo-sink1 typeflumeSink.AccumuloSink

2012-07-10 15:55:39,074 (conf-file-poller-0) [DEBUG -
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
77)] Sink type flumeSink.AccumuloSink is a custom type

2012-07-10 15:55:39,118 (conf-file-poller-0) [DEBUG -
org.apache.hadoop.conf.Configuration.<init>(Configuration.java:226)]
java.io.IOException: config(config)

        at
org.apache.hadoop.conf.Configuration.<init>(Configuration.java:226)

        at org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:184)

        at
org.apache.hadoop.mapreduce.JobContext.<init>(JobContext.java:52)

        at org.apache.hadoop.mapreduce.Job.<init>(Job.java:49)

        at
accumuloAccess.writers.IngestToSystemBehavior.setInstance(IngestToSystem
Behavior.java:42)

        at flumeSink.AccumuloSink.<init>(AccumuloSink.java:41)

        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)

        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorA
ccessorImpl.java:57)

        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCons
tructorAccessorImpl.java:45)

        at
java.lang.reflect.Constructor.newInstance(Constructor.java:525)

        at java.lang.Class.newInstance0(Class.java:372)

        at java.lang.Class.newInstance(Class.java:325)

        at
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
102)

        at
org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa
dSinks(PropertiesFileConfigurationProvider.java:303)

        at
org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa
d(PropertiesFileConfigurationProvider.java:214)

        at
org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(Abst
ractFileConfigurationProvider.java:124)

        at
org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(
AbstractFileConfigurationProvider.java:38)

        at
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:203)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

        at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:35
1)

        at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.acc
ess$301(ScheduledThreadPoolExecutor.java:178)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
(ScheduledThreadPoolExecutor.java:293)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1110)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:603)

        at java.lang.Thread.run(Thread.java:722)

 

2012-07-10 15:55:39,120 (conf-file-poller-0) [ERROR -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:205)] Failed to load
configuration data. Exception follows.

org.apache.flume.FlumeException: Unable to create sink: accumulo-sink1,
type: flumeSink.AccumuloSink, class: flumeSink.AccumuloSink

        at
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
108)

        at
org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa
dSinks(PropertiesFileConfigurationProvider.java:303)

        at
org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa
d(PropertiesFileConfigurationProvider.java:214)

        at
org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(Abst
ractFileConfigurationProvider.java:124)

        at
org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(
AbstractFileConfigurationProvider.java:38)

        at
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:203)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

        at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:35
1)

        at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.acc
ess$301(ScheduledThreadPoolExecutor.java:178)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
(ScheduledThreadPoolExecutor.java:293)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1110)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:603)

        at java.lang.Thread.run(Thread.java:722)

Caused by: java.lang.NullPointerException

        at
org.apache.hadoop.conf.Configuration.<init>(Configuration.java:230)

        at org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:184)

        at
org.apache.hadoop.mapreduce.JobContext.<init>(JobContext.java:52)

        at org.apache.hadoop.mapreduce.Job.<init>(Job.java:49)

        at
accumuloAccess.writers.IngestToSystemBehavior.setInstance(IngestToSystem
Behavior.java:42)

        at flumeSink.AccumuloSink.<init>(AccumuloSink.java:41)

        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)

        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorA
ccessorImpl.java:57)

        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCons
tructorAccessorImpl.java:45)

        at
java.lang.reflect.Constructor.newInstance(Constructor.java:525)

        at java.lang.Class.newInstance0(Class.java:372)

        at java.lang.Class.newInstance(Class.java:325)

        at
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
102)

        ... 13 more

2012-07-10 15:56:09,016 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:189)] Checking
file:conf/flume.conf for changes

2012-07-10 15:56:39,016 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:189)] Checking
file:conf/flume.conf for changes

 

Any pointers for me to figure out how to solve my problem?

Thanx.

 

From: Eric Sammer [mailto:esammer@cloudera.com] 
Sent: Wednesday, June 20, 2012 3:56 PM
To: flume-user@incubator.apache.org
Subject: Re: accumulo sink

 

Ray:

 

Not off hand, no. That said, it shouldn't be terribly difficult to
build. I'm less familiar with the Accumulo client APIs, but it's so
close to HBase that I don't believe it would take you more than a day or
so with basic testing. Take a look at one of the sinks (one of the more
basic being the LoggerSink) to get a template to start from.

On Tue, Jun 19, 2012 at 1:14 PM, Martin, Ray <armart3@tycho.ncsc.mil>
wrote:

Is anyone aware of a Flume sink for Accumulo?

Thanx.





 

-- 
Eric Sammer
twitter: esammer
data: www.cloudera.com


Mime
View raw message