flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Custom Kryo serializer
Date Mon, 24 Jul 2017 14:27:59 GMT
Copy of a mail i sent to the user mailing list only:

Raw state can only be used when implementing an operator, not a function.

For functions you have to use Managed Operator State. Your function will 
have to implement
the CheckpointedFunction interface, and create a ValueStateDescriptor 
that you register in initializeState.

On 24.07.2017 16:26, Boris Lublinsky wrote:
> Is there a chance, this can be answered?
>
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
> https://www.lightbend.com/
>
>> Begin forwarded message:
>>
>> *From: *Boris Lublinsky <boris.lublinsky@lightbend.com 
>> <mailto:boris.lublinsky@lightbend.com>>
>> *Subject: **Re: Custom Kryo serializer*
>> *Date: *July 19, 2017 at 8:28:16 AM CDT
>> *To: *user@flink.apache.org <mailto:user@flink.apache.org>, 
>> chesnay@apache.org <mailto:chesnay@apache.org>
>>
>> Thanks for the reply, but I am not using it for managed state, but 
>> rather for the raw state
>> In my implementation I have the following
>>
>> class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe, Double]{
>>
>>    // The managed keyed state see 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html

>> var modelState: ValueState[ModelToServeStats] = _
>>    var newModelState: ValueState[ModelToServeStats] = _
>>    // The raw state - 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state

>> var currentModel : Option[Model] =None
>>    var newModel : Option[Model] =None
>>
>> Where current and new model are instances of the trait for which I 
>> implement serializer
>> According to documentation 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
>>
>> “/Raw State/ is state that operators keep in their own data 
>> structures. When checkpointed, they only write a sequence of bytes 
>> into the checkpoint. Flink knows nothing about the state’s data 
>> structures and sees only the raw bytes.”
>>
>> So I was assuming that I need to provide serializer for this.
>> Am I missing something?
>>
>>
>>
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>> https://www.lightbend.com/
>>
>>>
>>> ---------- Forwarded message ----------
>>> From: *Chesnay Schepler* <chesnay@apache.org 
>>> <mailto:chesnay@apache.org>>
>>> Date: Wed, Jul 19, 2017 at 1:34 PM
>>> Subject: Re: Custom Kryo serializer
>>> To: user@flink.apache.org <mailto:user@flink.apache.org>
>>>
>>>
>>> Hello,
>>>
>>> I assume you're passing the class of your serializer in a 
>>> StateDescriptor constructor.
>>>
>>> If so, you could add a breakpoint in 
>>> Statedescriptor#initializeSerializerUnlessSet,
>>> and check what typeInfo is created and which serializer is created 
>>> as a result.
>>>
>>> One thing you could try right away is registering your serializer 
>>> for the Model implementations,
>>> instead of the trait.
>>>
>>> Regards,
>>> Chesnay
>>>
>>>
>>> On 14.07.2017 15:50, Boris Lublinsky wrote:
>>>> Hi
>>>> I have several implementations of my Model trait,
>>>>
>>>> trait Model {
>>>>    def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : Array[Byte]
>>>>    def getType :Long }
>>>>
>>>> neither one of them are serializable, but are used in the state 
>>>> definition.
>>>> So I implemented custom serializer
>>>>
>>>> import com.esotericsoftware.kryo.io 
>>>> <http://com.esotericsoftware.kryo.io/>.{Input, Output}
>>>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>>>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>>>>
>>>>
>>>> class ModelSerializerKryoextends Serializer[Model]{
>>>>    
>>>>    super.setAcceptsNull(false)
>>>>    super.setImmutable(true)
>>>>
>>>>    /** Reads bytes and returns a new object of the specified concrete 
>>>> type. * <p> * Before Kryo can be used to read child objects, {@link

>>>> Kryo#reference(Object)} must be called with the parent object to * 
>>>> ensure it can be referenced by the child objects. Any serializer 
>>>> that uses {@link Kryo} to read a child object may need to * be 
>>>> reentrant. * <p> * This method should not be called directly, 
>>>> instead this serializer can be passed to {@link Kryo} read methods 
>>>> that accept a * serialier. * * @return May be null if { @link 
>>>> #getAcceptsNull()} is true. */ override def read(kryo: Kryo, input: Input,
`type`:Class[Model]): Model = {
>>>>
>>>>      import ModelSerializerKryo._
>>>>
>>>>      val mType = input.readLong().asInstanceOf[Int]
>>>>      val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
>>>>      factories.get(mType)match {
>>>>        case Some(factory) => factory.restore(bytes)
>>>>        case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
>>>>      }
>>>>    }
>>>>
>>>>    /** Writes the bytes for the object to the output. * <p> * This

>>>> method should not be called directly, instead this serializer can 
>>>> be passed to {@link Kryo} write methods that accept a * serialier. 
>>>> * * @param value May be null if { @link #getAcceptsNull()} is true. 
>>>> */ override def write(kryo: Kryo, output: Output, value: Model):Unit = {
>>>>      output.writeLong(value.getType)
>>>>      output.write(value.toBytes)
>>>>    }
>>>> }
>>>>
>>>> object ModelSerializerKryo{
>>>>    private val factories =Map(ModelDescriptor.ModelType.PMML.value ->
PMMLModel, ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>>>> }
>>>> And added the following
>>>>
>>>> // Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model],
classOf[ModelSerializerKryo])
>>>>
>>>> To configure it.
>>>> I can see checkpoint messages at the output console, but I never 
>>>> hist a break point in serializer.
>>>> Any suggestions?
>>>>
>>>>
>>>>
>>>> Boris Lublinsky
>>>> FDP Architect
>>>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>>>> https://www.lightbend.com/
>>>>
>>>
>>>
>>
>> Begin forwarded message:
>>
>> *From: *Boris Lublinsky <boris.lublinsky@lightbend.com 
>> <mailto:boris.lublinsky@lightbend.com>>
>> *Subject: **Custom Kryo serializer*
>> *Date: *July 14, 2017 at 8:50:22 AM CDT
>> *To: *user@flink.apache.org <mailto:user@flink.apache.org>
>>
>> Hi
>> I have several implementations of my Model trait,
>>
>> trait Model {
>>    def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : Array[Byte]
>>    def getType :Long }
>>
>> neither one of them are serializable, but are used in the state 
>> definition.
>> So I implemented custom serializer
>>
>> import com.esotericsoftware.kryo.io 
>> <http://com.esotericsoftware.kryo.io/>.{Input, Output}
>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>>
>>
>> class ModelSerializerKryoextends Serializer[Model]{
>>    
>>    super.setAcceptsNull(false)
>>    super.setImmutable(true)
>>
>>    /** Reads bytes and returns a new object of the specified concrete 
>> type. * <p> * Before Kryo can be used to read child objects, {@link 
>> Kryo#reference(Object)} must be called with the parent object to * 
>> ensure it can be referenced by the child objects. Any serializer that 
>> uses {@link Kryo} to read a child object may need to * be reentrant. 
>> * <p> * This method should not be called directly, instead this 
>> serializer can be passed to {@link Kryo} read methods that accept a * 
>> serialier. * * @return May be null if { @link #getAcceptsNull()} is 
>> true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model
= {
>>
>>      import ModelSerializerKryo._
>>
>>      val mType = input.readLong().asInstanceOf[Int]
>>      val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
>>      factories.get(mType)match {
>>        case Some(factory) => factory.restore(bytes)
>>        case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
>>      }
>>    }
>>
>>    /** Writes the bytes for the object to the output. * <p> * This 
>> method should not be called directly, instead this serializer can be 
>> passed to {@link Kryo} write methods that accept a * serialier. * * 
>> @param value May be null if { @link #getAcceptsNull()} is true. */ 
>> override def write(kryo: Kryo, output: Output, value: Model):Unit = {
>>      output.writeLong(value.getType)
>>      output.write(value.toBytes)
>>    }
>> }
>>
>> object ModelSerializerKryo{
>>    private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>> }
>> And added the following
>>
>> // Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])
>>
>> To configure it.
>> I can see checkpoint messages at the output console, but I never hist 
>> a break point in serializer.
>> Any suggestions?
>>
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>> https://www.lightbend.com/
>>
>


Mime
View raw message