From user-return-24173-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Nov 8 11:53:17 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D3334180671 for ; Thu, 8 Nov 2018 11:53:15 +0100 (CET) Received: (qmail 1757 invoked by uid 500); 8 Nov 2018 10:53:14 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 1743 invoked by uid 99); 8 Nov 2018 10:53:14 -0000 Received: from mail-relay.apache.org (HELO mailrelay2-lw-us.apache.org) (207.244.88.137) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Nov 2018 10:53:14 +0000 Received: from [10.0.1.20] (dslb-002-205-086-134.002.205.pools.vodafone-ip.de [2.205.86.134]) by mailrelay2-lw-us.apache.org (ASF Mail Server at mailrelay2-lw-us.apache.org) with ESMTPSA id 9A17BA75; Thu, 8 Nov 2018 10:53:12 +0000 (UTC) Subject: Re: Starting a seperate Java process within a Flink cluster To: Jeff Zhang , "Ly, The Anh" Cc: Yun Tang , "user@flink.apache.org" References: <1541111594224.20641@campus.tu-berlin.de> From: Dawid Wysakowicz Openpgp: preference=signencrypt Autocrypt: addr=dwysakowicz@apache.org; keydata= xsFNBFlDn4sBEADAXnPD90xKDWhCifMUi7CQhC2x6YAAnkIAosTqtJiwKQWxD6H/gJof8h6u WryDyvg9aQa6svDYHjnhFlJ15QNUZM6ssqfwtsFx/wPn4NqkOnEXWyybRzv4++Hdci/3fQTu Ht+r69fnDQN/yDCK22KYLBj5r9aqX/M8L3pehulJXHSM/2A5FR5UocWgOh+pbc5ZXwxr72oF DOo+xG31/sMwYV/rQw+6ab5zdNOcuKPR1vxxMGFmYkP1fn3LEiVAq4TGB3y9tpH3jof0u0oS UNRMZg3lQfJxcyhJk+R2Pu+sg3TYI2mAm0sOwxWtl5nn1qxkK8fNsXA9c4Mjc8QWqVxaIQtz yVAhlNQmcKEP0fQhU6m1f6CADaR23J6MHOswfQUbSvKHjmKSo+IjFAYW4PaofXrZwVJmB4KD rsIVnEMeVdYtR3PmABhf3h8ugFKOxQmBpX/CbcH2ycCehRNQhM1uCBy+IF2OGoINuSLCGpy2 ABt1DVGy3G4hgPzlI2OK3xGs7XJWlnO/1uDC0QeSsgr0QwgPJGY/+dumRbadtDd0eae4+/HY zH7vtQd9gXv6qcqVDZr8yd2fzgfo1JJxc6we+ujVwMN0NQkFw6DK0Rob9T5Ip/JfVfOlbIj8 SgIdZBNEHdtB3HEZRZOCGSYBM/8Opa4qXs+3AG3W4Z3vW49ogwARAQABzS1EYXdpZCBXeXNh a293aWN6IDx3eXNha293aWN6LmRhd2lkQGdtYWlsLmNvbT7CwYAEEwEKACoCGwMFCQeGH4AF CwkIBwMFFQoJCAsFFgIDAQACHgECF4AFAllDn64CGQEACgkQMdLdEL/BWi2gOxAAlrC7Kag5 vq/lUhN3VSnqkklpbsDVOqHg8kyjLk3Sl2Wr3iMGQIQjm6aNb63SM7GpV3D6BDlT9znaXF8B Cy4bZ4MFJlABMCE3YdyjnLv2mnYOJc+mS3IiYMAr2D35/k1qDzQg3H9LqchVjUKVKLLC0J67 8bz9D7gTAb+EWcdyr3y2pzUtXNyp51nJ6zH1VfGcGec5O1uACgus6XT8S9QslimZKI42XB1I vW9ucZS0DiHZlwFmOnAcCDuxkaFPvhH++Q3FpuZ2kzAA/ccFoQ+iLshGs8EoaEgMpQA9BAjY oQGzMVnyfjQcN3eYm0bDA8jDckCHIFg2dLxR48w1s1c9LAdKaGhaYjlaa83dmDhGfn3zBprf O5S847v3+eAUFjsio8FOE8zfWdUs0CjUpkNHTmDtf+Hpu2R2uhL3cuqjL11C/fljqejx6Q88 y3EG+w3JqjZ5pRJzRy+ORuN1phjYyPb+MnsNC15Py4y1nNNp+B4r5wL05Svg37X7p2Lh0o9P D05L08RbH1WBghtbDrNoHPVYQy/KJTE2fulz3c35fwqU/mY5jHevtYf5+tR4BoEvk/ycVo5/ +JlO3jhtQLjQbsjKWvyrW+7X6emG0tGtg7f7W4vYQoDjyGef1AXOp/9K4hJuQcyVZk6lMbFz /GhjfmXTKInNFIKE35C2yrgjepfOwU0EWUOfiwEQALffFdOK3rCfsMc9LnPDBPiuX0yYiJW8 eD7bazMKjgh2wunqApw5XB2RP4qr+rU3GCc/M6x4pLaZpgWMrS9cfWtJVrWPFIwcvt7Y0Wpd kRQoKDKN11wuCZc/JHP+P6dmUGBSSpN+WOUf1KZ4K9rO5RyxV6MVoOk/QzMBDqE5emQvSRg5 YsmgO7F+ei7R8ZFyTf5BD7KaHkHpwNeDJ9impGL/BE97w9h11j5NxM5G8lNFs+1jqpMmKjRj A74xpfv1pSN6qo64L2J4e/lrgwaQrD/Tdpd153AMBojFmsnj07Lv5ZFHpgHtCy0fbq+R1O9N a1fYt26fVA9v8++1XDd5Fd/yiA6TCDxxhBsBap0lvNiifNX2v9WL2TNuOV1yDxX704yQavfg amz/jb8Os0vaYKzvKKN1WuHIFi+JUyjzmM0lkS3ollHqxUjhl0bvs37t848IcvYQqfw5Yj5A 7ybUkItDElrpue3JPEXsiBOsvCirmG5bylyVUHMWk3JCaZ8eYNFUCpLBh0moQ/wu2iC6eKzU DZMIzs8YdKrKAXvMqu+ly/e/r3u8gCzOTFLS7f+S3M5CP9hA7hpL15rTfDqaghQm6g/FCtvG 0d4O9bX7qe4lOq9Nvtuxk5LUZzFKV0Nv7vsQWYSiBJ2GPaxxC+5SY9xRZ7c3WzueSu6sDpJ+ C6NxABEBAAHCwWUEGAEKAA8FAllDn4sCGwwFCQeGH4AACgkQMdLdEL/BWi0s5Q//V79D2i3K Dwnoip4YZ4UEfWi3WrXwFgZ2volUg0pBPtoOlys5+rQY4eSfEszDVT38clIzAmY8Ft5M2zuh zpkKMKzWHYFOC+1rZ+d5GYqctc0Rr2QFMI8KrOCiOBlDyCt6q1XcgzwHPFjvMMuG8y7MjKuZ NUNFHfEQxUdNwAQ1ZBJVyJD4Xu7Z/E/ZL0OmqZ7CLzbNSGSagS7c6eO3+W1n7kiHY0IFTZ0I LHnk4NWNPHSA4L1nyQQLka8N4XZN4C59d0fL71C/OY7cq5N/+M5or8a5BLJZR8LZnaG0hOC5 iWbveLopQxXBwE9P2JI5nY+1nZ6WAHE0RcTJlYKErSRw/0eohEzgfHaMjvi9dtcQj8SQ1gzS PeZiMqZ/orlbnhhZnQ2At1dZwQsLiexL1THW+C5ldog0tNMC6iSH0XFWx5x4kfiIVVTnYNx+ LA28O5GOeUmCyPDbX/6qacHC91abzMXzZS+BI42pvn8bHwPEMbb6PFQgszaBSfDqQPjPJTMo 3JhdvSBsCq9fv2AxOSMwHwpsFPOSsAaRxsLfn78+oz51+30JVowG1JJ3OWf8rj9FBoOfjAMS Y8WYBtWcEtsBrFH8IhNver9NX1ABpyfMpblHDzg1/GcXx0kg1Heeg6/9sPqb8dOkY63fqWi9 ErMml6rahhxfOe3Xec5sLoVrq/4= Message-ID: <6e060c91-8e27-5635-edaf-f307896eab85@apache.org> Date: Thu, 8 Nov 2018 11:53:10 +0100 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101 Thunderbird/60.2.1 MIME-Version: 1.0 In-Reply-To: Content-Type: multipart/signed; micalg=pgp-sha256; protocol="application/pgp-signature"; boundary="q6sPFUnVrPOAuL8MIIyYxtMcsljQ8mwX2" This is an OpenPGP/MIME signed message (RFC 4880 and 3156) --q6sPFUnVrPOAuL8MIIyYxtMcsljQ8mwX2 Content-Type: multipart/mixed; boundary="gJCPbOfJlspXtrHWYjybMZDBW6Y9kSjZc"; protected-headers="v1" From: Dawid Wysakowicz To: Jeff Zhang , "Ly, The Anh" Cc: Yun Tang , "user@flink.apache.org" Message-ID: <6e060c91-8e27-5635-edaf-f307896eab85@apache.org> Subject: Re: Starting a seperate Java process within a Flink cluster References: <1541111594224.20641@campus.tu-berlin.de> In-Reply-To: --gJCPbOfJlspXtrHWYjybMZDBW6Y9kSjZc Content-Type: multipart/alternative; boundary="------------BE22F952FA565CE940ACC700" Content-Language: en-US This is a multi-part message in MIME format. --------------BE22F952FA565CE940ACC700 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable Hi, I am afraid that would be extremely hard what you are trying to do as in a cluster setup not all dependencies are taken from the taskmanager classpath, actually the user code classes are loaded dynamically, therefore they cannot be accessed in your new process which does not have access to those user classes. Best, Dawid On 02/11/2018 10:34, Jeff Zhang wrote: > > The error is most likely due to classpath issue. Because classpath is > different when you running flink program in IDE and run it in cluster.=C2= =A0 > > And starting another jvm process in SourceFunction doesn't seems a > good approach to me, is it possible for you to do in your custom > SourceFunction ? > > > Ly, The Anh >=E4=BA=8E2018=E5=B9=B411=E6=9C=88= 2=E6=97=A5=E5=91=A8=E4=BA=94 =E4=B8=8B=E5=8D=885:22=E5=86=99=E9=81=93=EF=BC= =9A > > Yes, i did. It is definitely there. I tried and made a separate > Maven project to test if something was wrong with my jar.=C2=A0 > The resulting shaded jar of that test project was fine and the > message-buffer-process was running with that test jar.=C2=A0 > > > Am 02.11.2018 04:47 schrieb Yun Tang >: > Hi > > Since you use the message-buffer-process as a dependency and the > error tells you class not found, have you ever check your > application jar package whether containing the wanted > MessageBufferProcess.class? If not existed, try to use > assembly-maven > =C2=A0 or > shaded-maven > plugin to > include your classes. > > Best > Yun Tang > -------------------------------------------------------------------= ----- > *From:* Ly, The Anh > > *Sent:* Friday, November 2, 2018 6:33 > *To:* user@flink.apache.org > *Subject:* Starting a seperate Java process within a Flink cluster > =C2=A0 > > Hello, > > > I am currently working on my masters and I encountered a difficult > problem. > > > Background (for context): I am trying to connect different data > stream processors. Therefore i am using Flink's > internal=C2=A0mechanisms of creating custom sinks and sources to > receive from and send to different data stream processors.=C2=A0I a= m > starting a separate=C2=A0 > > process (message-buffer-process) in those custom sinks and sources > to=C2=A0communicate and=C2=A0buffer data into that message-buffer-p= rocess.=C2=A0 > My implementation is=C2=A0created with Maven=C2=A0and it could pote= ntially > be added as an dependency.=C2=A0 > > > Problem: I already tested my implementation=C2=A0by=C2=A0adding it=C2= =A0as > an=C2=A0dependency to=C2=A0a=C2=A0simple Flink=C2=A0word-count=C2=A0= example. The test=C2=A0was > within an IDE which works perfectly fine. But when i package that > Flink work-count example and try > > to run it with "./flink run " or by uploading and submitting it as > a job, it tells me that my buffer-process-class could not be found:= > > In German: "Fehler: Hauptklasse > de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess > konnte nicht gefunden oder geladen werden" > > Roughly translated: "Error: Main class > de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess > could not be found or loaded" > > > Code snipplets: > > Example - Adding my custom sink to send data to another data > stream processor: > > dataStream.addSink( > (SinkFunction)DSPConnectorFactory.getInstance() > .createSinkConnector( newDSPConnectorConfig .Builder("localhost", > 9656) .withDSP("flink") > .withBufferConnectorString("buffer-connection-string") > .withHWM(20) .withTimeout(10000) .build())); > > > > The way i am trying to start the separate > buffer-process:=C2=A0JavaProcessBuilder.exec(MessageBufferProcess.c= lass, > connectionString, addSentMessagesFrame); How > JavaProcessBuilder.exec=C2=A0looks like: public static Process > exec(Class javaClass, String connectionString, boolean > addSentMessagesFrame) throws IOException, InterruptedException { > String javaHome =3D System.getProperty("java.home"); String javaBin= > =3D javaHome + File.separator + "bin" + File.separator + "java"; > String classpath =3D System.getProperty("java.class.path"); String > className =3D javaClass.getCanonicalName(); > System.out.println("Trying to build process " + classpath + " " + > className); ProcessBuilder builder =3D new ProcessBuilder( javaBin,= > "-cp", classpath, className, connectionString, > Boolean.toString(addSentMessagesFrame)); > builder.redirectOutput(ProcessBuilder.Redirect.INHERIT); > builder.redirectError(ProcessBuilder.Redirect.INHERIT); Process > process =3D builder.start(); return process; } I also tried running= > that message-buffer process separately in another maven project > and its packaged .jar file. That worked perfectly fine too. That > is why I am assuming that my approach is not appropriate for > running in Flink. Did I miss something and starting my approach > doesn't actually work within Flink's context? I hope the > information I gave you is sufficient to help understanding my > issue. If you need any more information feel free to message me! > Thanks for any help! With best regards > > --------------BE22F952FA565CE940ACC700 Content-Type: text/html; charset=utf-8 Content-Transfer-Encoding: quoted-printable

Hi,

I am afraid that would be extremely hard what you are trying to do as in a cluster setup not all dependencies are taken from the taskmanager classpath, actually the user code classes are loaded dynamically, therefore they cannot be accessed in your new process which does not have access to those user classes.

Best,

Dawid

On 02/11/2018 10:34, Jeff Zhang wrote:=

The error is most likely due to classpath issue. Because classpath is different when you running flink program in IDE and run it in cluster.=C2=A0

And starting another jvm process in SourceFunction doesn't seems a good approach to me, is it possible for you to do in your custom SourceFunction ?


Ly, The Anh <the.a.ly@campus.tu-berlin.de>= =E4=BA=8E2018=E5=B9=B411=E6=9C=882=E6=97=A5=E5=91=A8=E4=BA=94 =E4=B8=8B=E5=8D=885:22=E5=86=99=E9=81=93=EF=BC=9A
Yes, i did. It is definitely there. I tried and made a separate Maven project to test if something was wrong with my jar.=C2=A0
The resulting shaded jar of that test project was fine and the message-buffer-process was running with that test jar.=C2=A0


Am 02.11.2018 04:47 schrieb Yun Tang <myasuka@live= =2Ecom>:
Hi

Since you use the message-buffer-process as a dependency and the error tells you class not found, have you ever check your application jar package whether containing the wanted MessageBufferProcess.class? If not existed, try to use assembly-maven=C2=A0 or shaded-maven plugin to include your classes.

Best
Yun Tang

From: Ly, The Anh <the.a.ly@c= ampus.tu-berlin.de>
Sent: Friday, November 2, 2018 6:33
To: user@flink= =2Eapache.org
Subject: Starting a seperate Java process within a Flink cluster
=C2=A0

Hello,


I am currently working on my masters and I encountered a difficult problem.


Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal=C2=A0mechanisms of creating custom sin= ks and sources to receive from and send to different data stream processors.=C2=A0I am starting a separate=C2=A0<= /p>

process (message-buffer-process) in those custom sinks and sources to=C2=A0communicate and=C2=A0buffer d= ata into that message-buffer-process.=C2=A0 My implementation is=C2=A0created with Maven=C2=A0and it could potentiall= y be added as an dependency.=C2=A0


Problem: I already tested my implementation=C2=A0by=C2= =A0adding it=C2=A0as an=C2=A0dependency to=C2=A0a=C2=A0simple Flink=C2=A0word-count=C2=A0example. The test=C2=A0was w= ithin an IDE which works perfectly fine. But when i package that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBuffer= Process konnte nicht gefunden oder geladen werden"

Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBuffer= Process could not be found or loaded"


Code snipplets:

Example - Adding my custom sink to send data to another data stream processor:

dataS=
tream.=
addSink(
	(Si=
nkFunction)<=
/span>DSPCo=
nnectorFactory
		.getInstance()
		.createSinkConnector(
			new=
 DSPConnecto=
rConfig
				.Builder("local=
host=
", 9656)
                    		.withDSP("flink")
				.withBufferConnectorString("buffer-connection-string")
                    		.withHWM(20)
                    		.withTimeout(10000)
                    		.build()));



The way i =
am trying to start the separate buffer-process:=C2=A0JavaProcessBuilder.exec(MessageBufferProcess.class=
, connectionString, addSentMessagesFrame);
How JavaProcessBuilder.exec=C2=
=A0looks like:
public static Process exec(Cl=
ass javaClass, String connectionString, boolean addSentMessagesFrame) thr=
ows IOException, InterruptedException {
        String javaHome =3D=
 System.getProperty("java.home");
        String javaBin =3D=
 javaHome +
                File.separ=
ator + "bin" +
                File.separ=
ator + "java";
        String classpath =3D=
 System.getProperty("java.class.path");
        String className =3D=
 javaClass.getCanonicalName();

        System.out.println=
("Trying to build process " + classpath + " " + className);

        ProcessBuilder bui=
lder =3D new ProcessBuilder(
                javaBin, "=
-cp", classpath, className, connectionString, Boolean.toString(addSentMes=
sagesFrame));

        builder.redirectOu=
tput(ProcessBuilder.Redirect.INHERIT);
        builder.redirectEr=
ror(ProcessBuilder.Redirect.INHERIT);

        Process process =3D=
 builder.start();
        return process;
}

I also tried running that message-buffer process separately in another ma=
ven project and its packaged .jar file. That worked perfectly fine too. T=
hat is why I am assuming that my approach is not appropriate for running =
in Flink.
Did I miss something and starting my approach doesn't actually work withi=
n Flink's context? I hope the information I gave you is sufficient to hel=
p understanding my issue. If you need any more information feel free to m=
essage me!

Thanks for any help!

 With best regards


--------------BE22F952FA565CE940ACC700-- --gJCPbOfJlspXtrHWYjybMZDBW6Y9kSjZc-- --q6sPFUnVrPOAuL8MIIyYxtMcsljQ8mwX2 Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- iQIzBAEBCAAdFiEE6pOkNbTiybTJ9TP2MdLdEL/BWi0FAlvkFZcACgkQMdLdEL/B Wi3jew/9HyVw1PEAbh1fY5PuDsesgWuk2D6K3tR+77/HH5qeucPLctOQsCAlFUx1 NpvDDERoy1fbuqvpRaHy2XjDX7KmhbS0onK5jcgCOpK9isdIrojzkwBkksLjwg+c fKxoS5ofkwdy1JYH5HetCB1D0UNjACeXcagKq77Eh2ETgQegSbn/Io1+omBlRed7 IQLUKiW2CZlATwze9Uiy3TF0YQ1YK8+PhjcaJ5PS2t2clFpDkmd5qNQP2I0L0bPD lJa34GuU9jZzzl0WwQQkCKzmzFWBtQIT9k5P0iJZ7jJZsmchL4iXSVgllW7cVHYS Zh+t7ehKmnJ24mJy25IIEjPW43kPAkrdkr9X1ZdPP7RAsMFadeCIWKdz3uez4iEf nrxOZWlZ5D64uh00xQjxq+Zk5y8/7lgCKAlWXGLWQSEj9M9dV+cwRbYyq8DTGwWN LKQdYexPDGMio4GlJzFZ5fqohgMjXD06TR6t6Fqu3Wmj6M2rUZyQpdVzsDq0+0sm YNsOXESNytLqJw9sZqt0PYgWvvlULC/pwqtAr8fvXTwvm658XDqu21JVTwo33ZaQ d7JhbnNSG6CiucmU1/JdsH/dlZaUOdZceveBJv2xD+eaeHJ6b24V2rbqQOgKe/ey taRBmiAR3HOUA+pcPQ/f+123qkRGhGCqZaOw4wZoLDGYT22CCcQ= =u4a9 -----END PGP SIGNATURE----- --q6sPFUnVrPOAuL8MIIyYxtMcsljQ8mwX2--