flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Federico D'Ambrosio" <federico.dambro...@smartlab.ws>
Subject Re: The implementation of the RichSinkFunction is not serializable.
Date Mon, 28 Aug 2017 08:43:11 GMT
Hello everyone,

I solved my issue by using an Array[Byte] as a parameter, instead of the
explicit HTableDescriptor parameter. This way I can instantiate the
TableDescriptor inside the open method of OutputFormat using the static
method HTableDescriptor.parseFrom. In the end, marking conf, table and
connection as transient wouldn't make any difference.

Regards

2017-08-27 14:22 GMT+02:00 Federico D'Ambrosio <
federico.dambrosio@smartlab.ws>:

> Hi,
>
> could you elaborate, please? Marking conf, connection and table as
> transient wouldn't help because of the presence of the HTableDescriptor
> reference?
>
> 2017-08-27 12:44 GMT+02:00 Jörn Franke <jornfranke@gmail.com>:
>
>> It looks like that in your case everything should be serializable. An
>> alternative would be to mark certain non-serializable things as transient,
>> but as far as I see this is not possible in your case.
>>
>> On 27. Aug 2017, at 11:02, Federico D'Ambrosio <
>> federico.dambrosio@smartlab.ws> wrote:
>>
>> Hi,
>>
>> I'm trying to write on HBase using writeOutputFormat using a custom HBase
>> format inspired from this example
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java>
>> in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
>> the error reported in the mail object.
>>
>> Now, the OutputFormat I'm using is the following:
>>
>> abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, confPath :
Path) extends OutputFormat[T]{
>>
>>   private val LOG = LoggerFactory.getLogger(this.getClass)
>>
>>   var conf : org.apache.hadoop.conf.Configuration = _
>>   var connection : Connection = _
>>   var table : Table = _
>>   var taskNumber : String = _
>>
>>   @throws[IOException]
>>   def configure(parameters: Configuration): Unit = {
>>     conf = HBaseConfiguration.create()
>>     conf.addResource(confPath.getPath)
>>     connection = ConnectionFactory.createConnection(conf)
>>   }
>>
>>
>>   @throws[IOException]
>>   def close(): Unit = {
>>     table.close()
>>
>>   }
>>
>>
>>   @throws[IOException]
>>   def open(taskNumber: Int, numTasks: Int): Unit = {
>>     this.taskNumber = String.valueOf(taskNumber)
>>     val admin = connection.getAdmin
>>
>>     if(!admin.tableExists(tableDescriptor.getTableName))
>>       admin.createTable(tableDescriptor)
>>
>>     table = connection.getTable(tableDescriptor.getTableName)
>>
>>   }
>> }
>>
>> which is inherited by the actual format used, that implements the writeRecord method
>>
>>
>> class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
>>   extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)
>>
>> with BatchContainer being
>>
>> case class BatchContainer(batch: Iterable[(String, String, String, Int)]) extends
Serializable
>>
>> I'd like to ask you: what needs to be Serializable? As far as I see,
>> conf, connection and table are not Serializable and so they are surely part
>> of the issue. Are the constructor parameters, especially tableDescriptor
>> which is not Serializable, to be considered in this case? Should all the
>> methods implemented from the OutputFormat interface contain only
>> Serializable variables?
>>
>> Thank you for you attention,
>> Federico
>>
>>
>

Mime
View raw message