avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sunita Arvind <sunitarv...@gmail.com>
Subject Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open
Date Wed, 27 Jul 2016 17:10:35 GMT
For benefit of anyone else hitting the same issue, here is what I found:

The serializer I was using was extending AbstractAvroEventSerializer. This
class has a lot of adoption, so its not likely to be an issue in the
abstract class. However, I got rid of this issue by overriding the
configure method in AbstractAvroEventSerializer in my custom serializer, as
below:


public void configure(Context context) {
    int syncIntervalBytes = context.getInteger("syncIntervalBytes",
Integer.valueOf(2048000)).intValue();
    String compressionCodec = context.getString("compressionCodec", "null");
    this.writer = new ReflectDatumWriter(this.getSchema());
    this.dataFileWriter = new DataFileWriter(this.writer);
    this.dataFileWriter.setSyncInterval(syncIntervalBytes);
    try {
        CodecFactory e = CodecFactory.fromString(compressionCodec);
        this.dataFileWriter.setCodec(e);
      *  this.dataFileWriter.create(schema,out); --> added the creation *
    } catch (AvroRuntimeException var5) {
        logger.warn("Unable to instantiate avro codec with name (" +
compressionCodec + "). Compression disabled. Exception follows.",
var5);
    } catch (IOException io){
        logger.warn("Could not open dataFileWriter Exception
follows.", io.getStackTrace());
    }

}

After this, the files are getting created in hdfs just right.
I was also able to view the files in spark using spark-avro package.
Hope this is the right way to do it and the solution helps someone.
Would love to hear if anyone in avro or flume community knows of a
better way to do it.

regards
Sunita


On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <sunitarvind@gmail.com>
wrote:

> Hello Experts,
>
> I am trying to convert a custom data source received in flume into avro
> and push to hdfs. What I am attempting to do is
> syslog -> flume -> flume interceptor to convert into
> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
> to Avro
>
> The flume configuration looks like:
>
> tier1.sources.syslogsource.interceptors.i2.type=timestamp
> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
> tier1.sources.syslogsource.interceptors.i1.type =
> com.flume.CustomToAvroConvertInterceptor$Builder
>
> #hdfs sink for archival and batch analysis
> tier1.sinks.hdfssink.type = hdfs
> tier1.sinks.hdfssink.hdfs.writeFormat = Text
> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>
> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>
> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
> # roll file if it's been 10 * 60 seconds = 600
> tier1.sinks.hdfssink.hdfs.rollInterval=600
> # roll file if we get 50,000 log lines (~25MB)
> tier1.sinks.hdfssink.hdfs.rollCount=0
> tier1.sinks.hdfssink.hdfs.batchSize = 100
> tier1.sinks.hdfssink.hdfs.rollSize=0
> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
> tier1.sinks.hdfssink.channel = hdfsmem
>
> When I use tier1.sinks.hdfssink.serializer=avro_event
> I get binary data stored into hdfs which is the
> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
> however this data cannot be parsed in hive. As a result, I see all nulls in
> the column values.
> Based on -
> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
> ?
> all I am doing in RawAvroHiveSerializer.convert is to decode using binary
> Decoder.
> The exception I get seems to be unrelated to the code itself, hence
> pasting the stack trace. Will share the code if it is required to identify
> the rootcause:
>
> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
> deliver event. Exception follows.
> org.apache.flume.EventDeliveryException:
> org.apache.avro.AvroRuntimeException: not open
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.avro.AvroRuntimeException: not open
>         at
> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>         at
> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>         at
> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>         at
> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>         at
> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>
> I can reproduce this local file system as well. In the testcase, I tried
> setting the file open to append=true and still encounter the same exception.
>
> Appreciate any guidance in this regard.
>
> regards
> Sunita
>

Mime
View raw message