Hello,

I am trying to separate the logic of my application by generating and processing data in different physical engines. 

I have created my custom socket source class:

class SocketSourceFunction extends SourceFunction[Event2]{
      @volatile private var isRunning:Boolean = true;
      @transient private var serverSocket: ServerSocketChannel = null; 

      override def run(ctx: SourceContext[Event2]) = {
  val hostname = "localhost"
  val port = 6667
          println("listening:" + port)                
  val server = ServerSocketChannel.open();
  server.bind(new InetSocketAddress (hostname, port));     
  var buffer = ByteBuffer.allocate (68);
  val des = new EventDeSerializer2()
      
  while (isRunning) {
            println("waiting...")            
            var socketChannel = server.accept();

     if (socketChannel != null){
               println("accept:" + socketChannel)
               while (true) {
    var bytes = 0;
    bytes = socketChannel.read(buffer)
    if( bytes > 0) {
    if (!buffer.hasRemaining()) {
    buffer.rewind()
    var event: Event2 = des.deserialize(buffer.array())
    ctx.collect(event)
    buffer.clear()
    }
    }
                     }
}
          }          
      }

      override def cancel() = {
        isRunning = false;
        val socket = this.serverSocket; 
        if (socket != null) { 
          try { 
            socket.close(); 
           }catch { case e: Throwable => {  
              System.err.println(String.format("error: %s", e.getMessage()));
        e.printStackTrace();
        System.exit(1);
             }
           }
         } 
      }
}

I am sending data with either raw sockets using ByteBuffers or with a Flink generator (serializing my Events and using writeToSocket() method). However, in both cases, I am experiencing less than 10x throughput in comparison to in-memory generation, even when using a 10gbit connection (the throughput is much lower).

Is there any obvious defect in my implementation?

Thank you in advance,
George