avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sunita Arvind <sunitarv...@gmail.com>
Subject Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open
Date Thu, 28 Jul 2016 16:27:23 GMT
Nicolas,

For my usecase, I have the schema so I used the maven plugin to generate
avro classes for it. However, for your usecase where you do not have that
and just have a .avro, looks like this is an easy way:

java -jar ~/avro-tools-1.7.7.jar getschema twitter.avro > twitter.avsc

Found it here - https://github.com/miguno/avro-cli-examples

Another option as you are not using java is,
On mac :
1. ruby -e "$(curl -fsSL
https://raw.githubusercontent.com/Homebrew/install/master/install)" <
/dev/null 2> /dev/null
2. brew install avro-tools
3. avro-tools getschema file.avro

Typing avro-tools should give you the usage details and that might get you
further details. Hope this helps.

regards
Sunita

On Thu, Jul 28, 2016 at 6:29 AM, Nicolas Ranc <rnc.nicolas@gmail.com> wrote:

> Dear Sunita,
>
> Thank you for your fast answer.
>
> It's not exactly what i'm expected.
> I am using apache avro C++ and i would like to deserialize an .avro file.
> Just with one .avro file and Avro C++ functions i'm trying to extract the
> schematic, the keys on the schema and the data at the end of the file.
> ("avroProgram2.avro").
>
> I saw different functions: for example *http://avro.apache.org/docs/1.7.6/api/cpp/html/
> <http://avro.apache.org/docs/1.7.6/api/cpp/html/>* in the file:*
> resolving.cc*, the program use load(...) to import schema and use it
> after in the avro::resolvingDecoder.
> In my case, i can't import this schema for deserialization: i just have
> the .avro file and i'm searching function to store information from this
> file avro file (extract the schema, the keys and values). I need these
> information because i have to create new object after in Matlab - using Mex
> functions (in C++).
>
> Thanks you for your time,
> Nicolas Ranc
>
>
>
>
>
>
>
>
>
>
>
>
>
> 2016-07-27 19:10 GMT+02:00 Sunita Arvind <sunitarvind@gmail.com>:
>
>> For benefit of anyone else hitting the same issue, here is what I found:
>>
>> The serializer I was using was extending AbstractAvroEventSerializer.
>> This class has a lot of adoption, so its not likely to be an issue in the
>> abstract class. However, I got rid of this issue by overriding the
>> configure method in AbstractAvroEventSerializer in my custom serializer, as
>> below:
>>
>>
>> public void configure(Context context) {
>>     int syncIntervalBytes = context.getInteger("syncIntervalBytes", Integer.valueOf(2048000)).intValue();
>>     String compressionCodec = context.getString("compressionCodec", "null");
>>     this.writer = new ReflectDatumWriter(this.getSchema());
>>     this.dataFileWriter = new DataFileWriter(this.writer);
>>     this.dataFileWriter.setSyncInterval(syncIntervalBytes);
>>     try {
>>         CodecFactory e = CodecFactory.fromString(compressionCodec);
>>         this.dataFileWriter.setCodec(e);
>>       *  this.dataFileWriter.create(schema,out); --> added the creation *
>>     } catch (AvroRuntimeException var5) {
>>         logger.warn("Unable to instantiate avro codec with name (" + compressionCodec
+ "). Compression disabled. Exception follows.", var5);
>>     } catch (IOException io){
>>         logger.warn("Could not open dataFileWriter Exception follows.", io.getStackTrace());
>>     }
>>
>> }
>>
>> After this, the files are getting created in hdfs just right.
>> I was also able to view the files in spark using spark-avro package.
>> Hope this is the right way to do it and the solution helps someone.
>> Would love to hear if anyone in avro or flume community knows of a better way to
do it.
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <sunitarvind@gmail.com>
>> wrote:
>>
>>> Hello Experts,
>>>
>>> I am trying to convert a custom data source received in flume into avro
>>> and push to hdfs. What I am attempting to do is
>>> syslog -> flume -> flume interceptor to convert into
>>> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
>>> to Avro
>>>
>>> The flume configuration looks like:
>>>
>>> tier1.sources.syslogsource.interceptors.i2.type=timestamp
>>> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
>>> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
>>> tier1.sources.syslogsource.interceptors.i1.type =
>>> com.flume.CustomToAvroConvertInterceptor$Builder
>>>
>>> #hdfs sink for archival and batch analysis
>>> tier1.sinks.hdfssink.type = hdfs
>>> tier1.sinks.hdfssink.hdfs.writeFormat = Text
>>> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>>>
>>> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
>>> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>>>
>>> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
>>> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
>>> # roll file if it's been 10 * 60 seconds = 600
>>> tier1.sinks.hdfssink.hdfs.rollInterval=600
>>> # roll file if we get 50,000 log lines (~25MB)
>>> tier1.sinks.hdfssink.hdfs.rollCount=0
>>> tier1.sinks.hdfssink.hdfs.batchSize = 100
>>> tier1.sinks.hdfssink.hdfs.rollSize=0
>>> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
>>> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
>>> tier1.sinks.hdfssink.channel = hdfsmem
>>>
>>> When I use tier1.sinks.hdfssink.serializer=avro_event
>>> I get binary data stored into hdfs which is the
>>> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
>>> however this data cannot be parsed in hive. As a result, I see all nulls in
>>> the column values.
>>> Based on -
>>> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
>>> ?
>>> all I am doing in RawAvroHiveSerializer.convert is to decode using
>>> binary Decoder.
>>> The exception I get seems to be unrelated to the code itself, hence
>>> pasting the stack trace. Will share the code if it is required to identify
>>> the rootcause:
>>>
>>> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
>>> deliver event. Exception follows.
>>> org.apache.flume.EventDeliveryException:
>>> org.apache.avro.AvroRuntimeException: not open
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>>>         at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>         at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.avro.AvroRuntimeException: not open
>>>         at
>>> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>>>         at
>>> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>>>         at
>>> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>>>         at
>>> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>>>
>>> I can reproduce this local file system as well. In the testcase, I tried
>>> setting the file open to append=true and still encounter the same exception.
>>>
>>> Appreciate any guidance in this regard.
>>>
>>> regards
>>> Sunita
>>>
>>
>>
>

Mime
View raw message