flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Lublinsky <boris.lublin...@lightbend.com>
Subject Custom Kryo serializer
Date Fri, 14 Jul 2017 13:50:22 GMT
Hi 
I have several implementations of my Model trait, 

trait Model {
  def score(input : AnyVal) : AnyVal
  def cleanup() : Unit
  def toBytes() : Array[Byte]
  def getType : Long
}

neither one of them are serializable, but are used in the state definition.
So I implemented custom serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryo extends Serializer[Model]{
  
  super.setAcceptsNull(false)
  super.setImmutable(true)

  /** Reads bytes and returns a new object of the specified concrete type.
    * <p>
    * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be
called with the parent object to
    * ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo}
to read a child object may need to
    * be reentrant.
    * <p>
    * This method should not be called directly, instead this serializer can be passed to
{@link Kryo} read methods that accept a
    * serialier.
    *
    * @return May be null if { @link #getAcceptsNull()} is true. */
  
  override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {

    import ModelSerializerKryo._

    val mType = input.readLong().asInstanceOf[Int]
    val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
    factories.get(mType) match {
      case Some(factory) => factory.restore(bytes)
      case _ => throw new Exception(s"Unknown model type $mType to restore")
    }
  }

  /** Writes the bytes for the object to the output.
    * <p>
    * This method should not be called directly, instead this serializer can be passed to
{@link Kryo} write methods that accept a
    * serialier.
    *
    * @param value May be null if { @link #getAcceptsNull()} is true. */
  
  override def write(kryo: Kryo, output: Output, value: Model): Unit = {
    output.writeLong(value.getType)
    output.write(value.toBytes)
  }
}

object ModelSerializerKryo{
  private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
    ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following 

// Add custom serializer
env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never hist a break point in serializer.
Any suggestions?
 



Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/


Mime
View raw message