From user-return-17624-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Jan 15 13:19:16 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 51250180657 for ; Mon, 15 Jan 2018 13:19:16 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 40DDB160C31; Mon, 15 Jan 2018 12:19:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 614F5160C25 for ; Mon, 15 Jan 2018 13:19:15 +0100 (CET) Received: (qmail 44459 invoked by uid 500); 15 Jan 2018 12:19:14 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 44449 invoked by uid 99); 15 Jan 2018 12:19:14 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jan 2018 12:19:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A1BE118070D for ; Mon, 15 Jan 2018 12:19:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id bKeXB9UUco8k for ; Mon, 15 Jan 2018 12:19:12 +0000 (UTC) Received: from mail-ot0-f179.google.com (mail-ot0-f179.google.com [74.125.82.179]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 327635F23C for ; Mon, 15 Jan 2018 12:19:12 +0000 (UTC) Received: by mail-ot0-f179.google.com with SMTP id 44so9577528otk.8 for ; Mon, 15 Jan 2018 04:19:12 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=rmqCMXxCl9zSjK5gl7z7FFC8HolWvF5GqSq5fbBpVUk=; b=JpWnxIpUksZkfoLDYKITc/CN0sCDR6akGHmjUQeuqnqJkiWKr804Sw0S5yWO5Vd8qr 3RTx7/DIcwZjosmNIiZ4dmR93IPG5HaZsbXeM0aQ6OW1NWjXXNMGkaKfq6X9/ed9KXSb 75kNFLs0ndvXSml4pm6pNKPLjVeUJ2S0suDbYoNyszDcHu7cbIz1y9BNeOG4GBA/43Du cDO6lGR1W8MpYPUsjOcrA8aQoORJ3Lp41KtQbPKOyxrnsOX1UuVbCDYItBiOK1kdhwUd 4Efj5oF1oQDRlIWw8ycNpcep7gKeAC4DYtFRhywpx6Udb7GXTNB7r4HG+bJMwmBYSbit GilQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=rmqCMXxCl9zSjK5gl7z7FFC8HolWvF5GqSq5fbBpVUk=; b=teWYaiZChl9sWZTURw76WwLz/WUjvW5w47L2nwcSjL1olSsaTDqkpH70OlS5918jmn aF2qehQWaaOdhFINNaP1Yo3twZ4Kzh34JuBGhJ89/7Cqq1JolP7J/HctSVZzkbYizsGG NzG1kgF/vo0h0JzObMV4FAPMHeE6ByEQabSG0Oeo5LnFOuN1s0J1M8MaHKdqjFVYMZUn 5t7WQuZ18DBVQ3pfQrmGIVwnG8dC7VFc9+DbxA4b3W9EI4njW9N1lSagvSxKEVb4kYdh 4hv46fU8wi9+AjGZnf+zjCegAbsgiqwj1hih1zSbQ0ADbWmYzJfWfe4+uV22JlCmQ/Wu l3xA== X-Gm-Message-State: AKwxytc4JsG4P/vhBTzYF2T64YoRkcIqs4T7ZreJZO2+RehC0lJu+THi rjFGrlxrq0lCTMe3c7a/lKpQZOQ/UuV7xZJgYUOH1Q== X-Google-Smtp-Source: ACJfBouv/wLxH9HXFhoTwvBGPsblyaNoy4NgSJoab82vBJ0sMO9UxZR/3ExSvMmKJI3OSkwEdK7Fkcns9r6swsG1xos= X-Received: by 10.157.41.143 with SMTP id n15mr17953550otb.361.1516018751288; Mon, 15 Jan 2018 04:19:11 -0800 (PST) MIME-Version: 1.0 Received: by 10.74.156.9 with HTTP; Mon, 15 Jan 2018 04:19:10 -0800 (PST) From: George Theodorakis Date: Mon, 15 Jan 2018 12:19:10 +0000 Message-ID: Subject: Low throughput when trying to send data with Sockets To: user@flink.apache.org Content-Type: multipart/alternative; boundary="001a113d740c24bbe10562cf9fb9" --001a113d740c24bbe10562cf9fb9 Content-Type: text/plain; charset="UTF-8" Hello, I am trying to separate the logic of my application by generating and processing data in different physical engines. I have created my custom socket source class: class SocketSourceFunction extends SourceFunction[Event2]{ @volatile private var isRunning:Boolean = true; @transient private var serverSocket: ServerSocketChannel = null; override def run(ctx: SourceContext[Event2]) = { val hostname = "localhost" val port = 6667 println("listening:" + port) val server = ServerSocketChannel.open(); server.bind(new InetSocketAddress (hostname, port)); var buffer = ByteBuffer.allocate (68); val des = new EventDeSerializer2() while (isRunning) { println("waiting...") var socketChannel = server.accept(); if (socketChannel != null){ println("accept:" + socketChannel) while (true) { var bytes = 0; bytes = socketChannel.read(buffer) if( bytes > 0) { if (!buffer.hasRemaining()) { buffer.rewind() var event: Event2 = des.deserialize(buffer.array()) ctx.collect(event) buffer.clear() } } } } } } override def cancel() = { isRunning = false; val socket = this.serverSocket; if (socket != null) { try { socket.close(); }catch { case e: Throwable => { System.err.println(String.format("error: %s", e.getMessage())); e.printStackTrace(); System.exit(1); } } } } } I am sending data with either raw sockets using ByteBuffers or with a Flink generator (serializing my Events and using writeToSocket() method). However, in both cases, I am experiencing less than 10x throughput in comparison to in-memory generation, even when using a 10gbit connection (the throughput is much lower). Is there any obvious defect in my implementation? Thank you in advance, George --001a113d740c24bbe10562cf9fb9 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hello,

I am trying to separate the logi= c of my application by generating and processing data in different physical= engines.=C2=A0

I have created my custom socket so= urce class:

class SocketSourceFunction extend= s SourceFunction[Event2]{
=C2=A0 =C2=A0 =C2=A0 @volatile private = var isRunning:Boolean =3D true;
=C2=A0 =C2=A0 =C2=A0 @transient p= rivate var serverSocket: ServerSocketChannel =3D null;=C2=A0

=
=C2=A0 =C2=A0 =C2=A0 override def run(ctx: SourceContext[Event2]= ) =3D {
=C2=A0 val hostna= me =3D "localhost"
=C2=A0 val port =3D 6667
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 p= rintln("listening:" + port)=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0
= =C2=A0 val server =3D ServerSocketC= hannel.open();
=C2=A0 ser= ver.bind(new InetSocketAddress (hostname, port)); =C2=A0 =C2=A0=C2=A0
=C2=A0 var buffer =3D ByteBuffer.allocate (68);
=C2=A0 val des =3D new EventDeSerializer2(= )
=C2=A0 =C2=A0 =C2=A0= =C2=A0
=C2=A0 while (isRu= nning) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 println("= waiting...")=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 var socketChannel =3D server.acce= pt();

=C2= =A0 =C2=A0 =C2=A0if (socketChannel !=3D null){
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0println("accept:" + socketC= hannel)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0wh= ile (true) {
=C2=A0 =C2= =A0 var bytes =3D 0;
= =C2=A0 =C2=A0 bytes =3D socketChannel.read(buffer)
=C2=A0 =C2=A0 if( bytes > 0) {
=C2=A0 =C2=A0 if (!buffer.hasRemain= ing()) {
=C2=A0 =C2= =A0 buffer.rewind()
= =C2=A0 =C2=A0 var event: Event2 =3D des.deserialize(buffer.array())
=C2=A0 =C2=A0 ctx.collect(ev= ent)
=C2=A0 =C2=A0 bu= ffer.clear()
=C2=A0 = =C2=A0 }
=C2=A0 =C2=A0 = }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0}
}
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 }=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0
=C2=A0 =C2=A0 = =C2=A0 }

=C2=A0 =C2=A0 =C2=A0 override def cancel(= ) =3D {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 isRunning =3D false;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 val socket =3D this.serverSocket;=C2=A0
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 if (socket !=3D null) {=C2=A0
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 try {=C2=A0
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 socket.close();=C2=A0
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0}catch { case e: Throwable =3D> {=C2=A0=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 System.err.println(String.format("error: %s"= , e.getMessage()));
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 e.printStackTrace();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 System.exit(1);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
=C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0}
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}= =C2=A0
=C2=A0 =C2=A0 =C2=A0 }
}

I am sending data with either raw sockets using ByteBuffers or with= a Flink generator (serializing my Events and using writeToSocket() method)= . However, in both cases, I am experiencing less than 10x throughput in com= parison to in-memory generation, even when using a=C2=A010gbit connection (= the throughput is much lower).

Is there any ob= vious defect in my implementation?

Thank you in ad= vance,
George
--001a113d740c24bbe10562cf9fb9--