flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Lublinsky <boris.lublin...@lightbend.com>
Subject Re: Custom Kryo serializer
Date Wed, 19 Jul 2017 13:28:16 GMT
Thanks for the reply, but I am not using it for managed state, but rather for the raw state
In my implementation I have the following

class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double]{

  // The managed keyed state see https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
  var modelState: ValueState[ModelToServeStats] = _
  var newModelState: ValueState[ModelToServeStats] = _
  // The raw state - https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
  var currentModel : Option[Model] = None
  var newModel : Option[Model] = None

Where current and new model are instances of the trait for which I implement serializer
According to documentation https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state>

“Raw State is state that operators keep in their own data structures. When checkpointed,
they only write a sequence of bytes into the checkpoint. Flink knows nothing about the state’s
data structures and sees only the raw bytes.”

So I was assuming that I need to provide serializer for this.
Am I missing something?





Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/

> 
> ---------- Forwarded message ----------
> From: Chesnay Schepler <chesnay@apache.org <mailto:chesnay@apache.org>>
> Date: Wed, Jul 19, 2017 at 1:34 PM
> Subject: Re: Custom Kryo serializer
> To: user@flink.apache.org <mailto:user@flink.apache.org>
> 
> 
> Hello,
> 
> I assume you're passing the class of your serializer in a StateDescriptor constructor.
> 
> If so, you could add a breakpoint in Statedescriptor#initializeSerializerUnlessSet,
> and check what typeInfo is created and which serializer is created as a result.
> 
> One thing you could try right away is registering your serializer for the Model implementations,
> instead of the trait.
> 
> Regards,
> Chesnay
> 
> 
> On 14.07.2017 15:50, Boris Lublinsky wrote:
>> Hi 
>> I have several implementations of my Model trait, 
>> 
>> trait Model {
>>   def score(input : AnyVal) : AnyVal
>>   def cleanup() : Unit
>>   def toBytes() : Array[Byte]
>>   def getType : Long
>> }
>> 
>> neither one of them are serializable, but are used in the state definition.
>> So I implemented custom serializer
>> 
>> import com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io/>.{Input,
Output}
>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>> 
>> 
>> class ModelSerializerKryo extends Serializer[Model]{
>>   
>>   super.setAcceptsNull(false)
>>   super.setImmutable(true)
>> 
>>   /** Reads bytes and returns a new object of the specified concrete type.
>>     * <p>
>>     * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)}
must be called with the parent object to
>>     * ensure it can be referenced by the child objects. Any serializer that uses
{@link Kryo} to read a child object may need to
>>     * be reentrant.
>>     * <p>
>>     * This method should not be called directly, instead this serializer can be passed
to {@link Kryo} read methods that accept a
>>     * serialier.
>>     *
>>     * @return May be null if { @link #getAcceptsNull()} is true. */
>>   
>>   override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {
>> 
>>     import ModelSerializerKryo._
>> 
>>     val mType = input.readLong().asInstanceOf[Int]
>>     val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
>>     factories.get(mType) match {
>>       case Some(factory) => factory.restore(bytes)
>>       case _ => throw new Exception(s"Unknown model type $mType to restore")
>>     }
>>   }
>> 
>>   /** Writes the bytes for the object to the output.
>>     * <p>
>>     * This method should not be called directly, instead this serializer can be passed
to {@link Kryo} write methods that accept a
>>     * serialier.
>>     *
>>     * @param value May be null if { @link #getAcceptsNull()} is true. */
>>   
>>   override def write(kryo: Kryo, output: Output, value: Model): Unit = {
>>     output.writeLong(value.getType)
>>     output.write(value.toBytes)
>>   }
>> }
>> 
>> object ModelSerializerKryo{
>>   private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
>>     ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>> }
>> And added the following 
>> 
>> // Add custom serializer
>> env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])
>> 
>> To configure it.
>> I can see checkpoint messages at the output console, but I never hist a break point
in serializer.
>> Any suggestions?
>>  
>> 
>> 
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
> 
> 


Mime
View raw message