flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Custom Kryo serializer
Date Wed, 19 Jul 2017 14:10:35 GMT
Raw state can only be used when implementing an operator, not a function.

For functions you have to use Managed Operator State. Your function will 
have to implement
the CheckpointedFunction interface, and create a ValueStateDescriptor 
that you register in initializeState.

On 19.07.2017 15:28, Boris Lublinsky wrote:
> Thanks for the reply, but I am not using it for managed state, but 
> rather for the raw state
> In my implementation I have the following
>
> class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe, Double]{
>
>    // The managed keyed state see 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html 
> var modelState: ValueState[ModelToServeStats] = _
>    var newModelState: ValueState[ModelToServeStats] = _
>    // The raw state - 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state

> var currentModel : Option[Model] =None
>    var newModel : Option[Model] =None
>
> Where current and new model are instances of the trait for which I 
> implement serializer
> According to documentation 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
>
> “/Raw State/ is state that operators keep in their own data 
> structures. When checkpointed, they only write a sequence of bytes 
> into the checkpoint. Flink knows nothing about the state’s data 
> structures and sees only the raw bytes.”
>
> So I was assuming that I need to provide serializer for this.
> Am I missing something?
>
>
>
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
> https://www.lightbend.com/
>
>>
>> ---------- Forwarded message ----------
>> From: *Chesnay Schepler* <chesnay@apache.org <mailto:chesnay@apache.org>>
>> Date: Wed, Jul 19, 2017 at 1:34 PM
>> Subject: Re: Custom Kryo serializer
>> To: user@flink.apache.org <mailto:user@flink.apache.org>
>>
>>
>> Hello,
>>
>> I assume you're passing the class of your serializer in a 
>> StateDescriptor constructor.
>>
>> If so, you could add a breakpoint in 
>> Statedescriptor#initializeSerializerUnlessSet,
>> and check what typeInfo is created and which serializer is created as 
>> a result.
>>
>> One thing you could try right away is registering your serializer for 
>> the Model implementations,
>> instead of the trait.
>>
>> Regards,
>> Chesnay
>>
>>
>> On 14.07.2017 15:50, Boris Lublinsky wrote:
>>> Hi
>>> I have several implementations of my Model trait,
>>>
>>> trait Model {
>>>    def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : Array[Byte]
>>>    def getType :Long }
>>>
>>> neither one of them are serializable, but are used in the state 
>>> definition.
>>> So I implemented custom serializer
>>>
>>> import com.esotericsoftware.kryo.io 
>>> <http://com.esotericsoftware.kryo.io/>.{Input, Output}
>>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>>>
>>>
>>> class ModelSerializerKryoextends Serializer[Model]{
>>>    
>>>    super.setAcceptsNull(false)
>>>    super.setImmutable(true)
>>>
>>>    /** Reads bytes and returns a new object of the specified concrete 
>>> type. * <p> * Before Kryo can be used to read child objects, {@link 
>>> Kryo#reference(Object)} must be called with the parent object to * 
>>> ensure it can be referenced by the child objects. Any serializer 
>>> that uses {@link Kryo} to read a child object may need to * be 
>>> reentrant. * <p> * This method should not be called directly, 
>>> instead this serializer can be passed to {@link Kryo} read methods 
>>> that accept a * serialier. * * @return May be null if { @link 
>>> #getAcceptsNull()} is true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]):
Model = {
>>>
>>>      import ModelSerializerKryo._
>>>
>>>      val mType = input.readLong().asInstanceOf[Int]
>>>      val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
>>>      factories.get(mType)match {
>>>        case Some(factory) => factory.restore(bytes)
>>>        case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
>>>      }
>>>    }
>>>
>>>    /** Writes the bytes for the object to the output. * <p> * This 
>>> method should not be called directly, instead this serializer can be 
>>> passed to {@link Kryo} write methods that accept a * serialier. * * 
>>> @param value May be null if { @link #getAcceptsNull()} is true. */ 
>>> override def write(kryo: Kryo, output: Output, value: Model):Unit = {
>>>      output.writeLong(value.getType)
>>>      output.write(value.toBytes)
>>>    }
>>> }
>>>
>>> object ModelSerializerKryo{
>>>    private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>>> }
>>> And added the following
>>>
>>> // Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model],
classOf[ModelSerializerKryo])
>>>
>>> To configure it.
>>> I can see checkpoint messages at the output console, but I never 
>>> hist a break point in serializer.
>>> Any suggestions?
>>>
>>>
>>>
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>>> https://www.lightbend.com/
>>>
>>
>>
>


Mime
View raw message