flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "DeCarlo, Thom" <tdeca...@mitre.org>
Subject RE: AVRO_EVENT problem
Date Thu, 06 Dec 2012 20:37:08 GMT
Interesting. It looks like the JDBC Channel causes problems for the file_roller+avro_event_serializer
sink. Everything works right after switching that sink to use the Memory channel. Also, the
logger sink is perfectly happy to use the jdbc channel.

After thinking about this some more (and looking at the file output using the avro_event serializer),
maybe it doesn't make sense to use that serializer the way I've attempted. Unless the file
is read later by an AVRO reader the file won't be useful because it contains non-printing
characters, which probably mean something to AVRO, but not much at all to vi.

I'm going to try using the Netcat source->JDBC channel->AVRO sink, which will send to
an AVRO source->JDBC channel->file_roller sink and see if the jdbc will work in there.
(My target system is going to need high reliability message passing and the jdbc channel sounds
like it is the most robust of the available channels.)
--
Thom DeCarlo


-----Original Message-----
From: Brock Noland [mailto:brock@cloudera.com]
Sent: Thursday, December 06, 2012 2:52 PM
To: user@flume.apache.org
Subject: Re: AVRO_EVENT problem

OK, I don't really understand how Avro is working here, but I think
you should try FileChannel or maybe MemoryChannel for simplicity to
see if that works. IE, I think the problem is JDBCChannel. Can you let
me know how it turns out?

On Thu, Dec 6, 2012 at 1:45 PM, Brock Noland <brock@cloudera.com> wrote:
> Hmm, looks like I missed something..Not familiar with this code.
>
> On Thu, Dec 6, 2012 at 1:29 PM, DeCarlo, Thom <tdecarlo@mitre.org> wrote:
>> Hmmm... How does one define/check/set the schema? I lifted my configuration straight
from the on-line documentation and it only mentions a schema in the JDBC Channel section.
>>
>> --
>> Thom DeCarlo
>>
>>
>> -----Original Message-----
>> From: Brock Noland [mailto:brock@cloudera.com]
>> Sent: Thursday, December 06, 2012 1:57 PM
>> To: user@flume.apache.org
>> Subject: Re: AVRO_EVENT problem
>>
>> It seems to me like the object you are trying to write doesn't match
>> the Schema? I'd check your convert() method.
>>
>> On Thu, Dec 6, 2012 at 9:09 AM, DeCarlo, Thom <tdecarlo@mitre.org> wrote:
>>> Oh, right... forgot the version. I'm using the latest (as of yesterday) from
the git repository. It gets built as 1.4-SNAPSHOT.
>>>
>>> --
>>> Thom DeCarlo
>>>
>>>
>>> -----Original Message-----
>>> From: Brock Noland [mailto:brock@cloudera.com]
>>> Sent: Thursday, December 06, 2012 10:06 AM
>>> To: user@flume.apache.org
>>> Subject: Re: AVRO_EVENT problem
>>>
>>> Hi,
>>>
>>> Hopefully someone will be able to answer the AVRO issue, in order to
>>> help them, what version of Flume are you running?
>>>
>>> Brock
>>>
>>> On Thu, Dec 6, 2012 at 8:59 AM, DeCarlo, Thom <tdecarlo@mitre.org> wrote:
>>>> Hi,
>>>> I'm just getting started with flume, so I apologize if this is an already
known problem.
>>>>
>>>> I'm trying to set up a FILE_ROLL sink that uses the AVRO_EVENT serializer.
But, when I start the agent I get an exception thrown from within the AVRO DataFileWriter
class. It is failing when trying to cast a java.util.ArrayList to java.util.Map.
>>>>
>>>> Can anyone tell me how this is supposed to work?
>>>>
>>>> OBTW, I'm running on Windows 7 (64-bit), and I've tried both the 32- and
64-bit versions of both Java 1.6.0_37 and 1.7.0_09, with the same results.
>>>>
>>>> Thanks,
>>>> Thom
>>>> --
>>>> The console output looks like this:
>>>>
>>>> 2012-12-04 14:24:25,111 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.sink.RollingFileSink.start(RollingFileSink.java:135)]
RollingFileSink fileSink-1 started.
>>>> 2012-12-04 14:24:25,145 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)]
Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:33333]
>>>> 2012-12-04 14:24:25,593 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN
- org.apache.flume.channel.jdbc.impl.JdbcTransactionImpl.rollback(JdbcTransactionImpl.java:102)]
Marking transaction for rollback
>>>> 2012-12-04 14:24:25,594 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO
- org.apache.flume.channel.jdbc.impl.JdbcTransactionImpl.close(JdbcTransactionImpl.java:118)]
Attempting transaction roll-back
>>>> 2012-12-04 14:24:25,604 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event.
Exception follows.
>>>> org.apache.flume.EventDeliveryException: Failed to process transaction
>>>>         at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
>>>>         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>         at java.lang.Thread.run(Thread.java:722)
>>>> Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException:
java.util.ArrayList cannot be cast to java.util.Map
>>>>         at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263)
>>>>         at org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>>>>         at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:195)
>>>>         ... 3 more
>>>> Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast
to java.util.Map
>>>>         at org.apache.avro.generic.GenericDatumWriter.getMapSize(GenericDatumWriter.java:174)
>>>>         at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:159)
>>>>         at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
>>>>         at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104)
>>>>         at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:105)
>>>>         at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:65)
>>>>         at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104)
>>>>         at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:57)
>>>>         at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257)
>>>>         ... 5 more
>>>>
>>>> --
>>>> My config file looks like this:
>>>>
>>>> ########################################
>>>> # Sources
>>>> ########################################
>>>> agent1.sources = source1
>>>>
>>>> # Describe/configure source1
>>>> agent1.sources.source1.type = netcat
>>>> agent1.sources.source1.bind = localhost
>>>> agent1.sources.source1.port = 33333
>>>> agent1.sources.source1.interceptors = inter1 inter2
>>>> agent1.sources.source1.interceptors.inter1.type = timestamp
>>>> agent1.sources.source1.interceptors.inter2.type = host
>>>> agent1.sources.source1.interceptors.inter2.hostHeader = hostname
>>>>
>>>> ########################################
>>>> # Channels
>>>> ########################################
>>>> agent1.channels = channel1 jdbcChannel-1
>>>>
>>>> # Use a channel which buffers events in memory
>>>> agent1.channels.channel1.type = memory
>>>> agent1.channels.channel1.capacity = 1000
>>>> agent1.channels.channel1.transactionCapacity = 100
>>>>
>>>> # Add a channel which uses JDBC to buffer messages
>>>> agent1.channels.jdbcChannel-1.type = jdbc
>>>>
>>>> ########################################
>>>> # Sinks
>>>> ########################################
>>>> agent1.sinks = sink1 fileSink-1
>>>>
>>>> # Describe sink1, a simple console Logger Sink
>>>> agent1.sinks.sink1.type = logger
>>>>
>>>> #Describe fileSink-1, which writes to a file
>>>> agent1.sinks.fileSink-1.type = FILE_ROLL
>>>> agent1.sinks.fileSink-1.sink.directory = c:/flume/logs
>>>> agent1.sinks.fileSink-1.sink.rollInterval = 3600
>>>> #agent1.sinks.fileSink-1.sink.serializer = org.apache.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
>>>> agent1.sinks.fileSink-1.sink.serializer = AVRO_EVENT
>>>> agent1.sinks.fileSink-1.sink.serializer.compressionCodec = snappy
>>>>
>>>> ########################################
>>>> # Bindings
>>>> ########################################
>>>> # Bind the source and sink to the channel
>>>> agent1.sources.source1.channels = channel1 jdbcChannel-1
>>>> agent1.sinks.sink1.channel = channel1
>>>> agent1.sinks.fileSink-1.channel = jdbcChannel-1
>>>>
>>>>
>>>> --
>>>> Thom DeCarlo
>>>>
>>>
>>>
>>>
>>> --
>>> Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
>>
>>
>>
>> --
>> Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/



--
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Mime
View raw message