From user-return-24068-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Nov 2 10:35:15 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7DC6018062B for ; Fri, 2 Nov 2018 10:35:14 +0100 (CET) Received: (qmail 99232 invoked by uid 500); 2 Nov 2018 09:35:13 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 99222 invoked by uid 99); 2 Nov 2018 09:35:13 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Nov 2018 09:35:13 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id C3BCB180D79 for ; Fri, 2 Nov 2018 09:35:12 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.515 X-Spam-Level: ** X-Spam-Status: No, score=2.515 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HK_RANDOM_ENVFROM=0.626, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id HAHghuw4Rl7T for ; Fri, 2 Nov 2018 09:35:11 +0000 (UTC) Received: from mail-lf1-f52.google.com (mail-lf1-f52.google.com [209.85.167.52]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 909DE5F4DB for ; Fri, 2 Nov 2018 09:35:10 +0000 (UTC) Received: by mail-lf1-f52.google.com with SMTP id p17so887416lfh.4 for ; Fri, 02 Nov 2018 02:35:10 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=d/69PekfTfR6ovHSM2hbl6D8/YehUeFTbxYRv1RBPqc=; b=prUi+ygcaf3LJZserpiGu3s2527yr9BrCIPaO6+qS48dXtyQgBgN26UUog+Suf39l7 nB4bTJPCZ0kcyE/FlEyIK9pbQVW2WvzIZEwJlPpjaHIZ5BuYJjnREpsDCfdWHN1te/gE +EoYUNv5WqRufiAbKedMF+vzXw5EhKOn9aATc9mOmPt/4JysHR0IJ4LFlJiEe9KVE9Jg 6U8Q4dTcJCha3/FAYkMH4cInpfPLTHLwKSAdBUxEf6xcrvtsf82DndB0EP2sGOmMXtHX czcJYgxfZjkZqZKVe1CYA1UIVqvsIAiMJ/ZPlAkGdwsU9iM1dHImMyf3v8KqSp+cnvnS OM3Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=d/69PekfTfR6ovHSM2hbl6D8/YehUeFTbxYRv1RBPqc=; b=NhZTBj6U4ID+EQU3U8gcdAaBRUJM7yyE7+TVhQmiCeCyCHSvSV/5Zic0b3Lu0Ggjrb w1YDZ6UCKKJ+zz8RuXNlvJ2hdWFv/UVj4Ey5VbOpo53KXKJ6bNNBsXbDvmg1xDDatGcE T5i36y/CWi55naPCyhmiI2OaDvW/eQmEAIpbKSTlCtqFDmOQWN7zi2nTD610NNNTauk9 sJh6JlwzwDF/ax7XhIKzAeUH6ZWs4MG8mA6KflGkRnqC/jhUqawjU0ApG/mLcjxm4/re lfjic83NDIxGx423LdF+2SmFzLuIj5ZQlqLDaO0SCy8ia3UjKeNJPzrwUoRutXHgtHHk 3aTA== X-Gm-Message-State: AGRZ1gLlRME2w94wykTYmmrYoSHXqCmL8jQTRyAmktTVH3+wuhnOgF8t a8i5dnbUeeJduh9hQwkpyjRgIPAAdkpDX/jAfxw= X-Google-Smtp-Source: AJdET5dXBuzZ9E+7Lmsz1NOF4kz1d1jK5pEPEKeMHYpEj320BdsFUjgKNhKwhg/KO8pEcvYl1VsGq4yZi6D/z0He9us= X-Received: by 2002:a19:5402:: with SMTP id i2mr6243787lfb.128.1541151308911; Fri, 02 Nov 2018 02:35:08 -0700 (PDT) MIME-Version: 1.0 References: <1541111594224.20641@campus.tu-berlin.de> In-Reply-To: From: Jeff Zhang Date: Fri, 2 Nov 2018 02:34:56 -0700 Message-ID: Subject: Re: Starting a seperate Java process within a Flink cluster To: "Ly, The Anh" Cc: Yun Tang , "user@flink.apache.org" Content-Type: multipart/alternative; boundary="0000000000005019760579ab406a" --0000000000005019760579ab406a Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable The error is most likely due to classpath issue. Because classpath is different when you running flink program in IDE and run it in cluster. And starting another jvm process in SourceFunction doesn't seems a good approach to me, is it possible for you to do in your custom SourceFunction = ? Ly, The Anh =E4=BA=8E2018=E5=B9=B411=E6=9C=88= 2=E6=97=A5=E5=91=A8=E4=BA=94 =E4=B8=8B=E5=8D=885:22=E5=86=99=E9=81=93=EF=BC= =9A > Yes, i did. It is definitely there. I tried and made a separate Maven > project to test if something was wrong with my jar. > The resulting shaded jar of that test project was fine and the > message-buffer-process was running with that test jar. > > > Am 02.11.2018 04:47 schrieb Yun Tang : > Hi > > Since you use the message-buffer-process as a dependency and the error > tells you class not found, have you ever check your application jar packa= ge > whether containing the wanted MessageBufferProcess.class? If not existed, > try to use assembly-maven > or shaded-mave= n > plugin to include > your classes. > > Best > Yun Tang > ------------------------------ > *From:* Ly, The Anh > *Sent:* Friday, November 2, 2018 6:33 > *To:* user@flink.apache.org > *Subject:* Starting a seperate Java process within a Flink cluster > > > Hello, > > > I am currently working on my masters and I encountered a difficult > problem. > > > Background (for context): I am trying to connect different data stream > processors. Therefore i am using Flink's internal mechanisms of creating > custom sinks and sources to receive from and send to different data strea= m > processors. I am starting a separate > > process (message-buffer-process) in those custom sinks and sources > to communicate and buffer data into that message-buffer-process. My > implementation is created with Maven and it could potentially be added as > an dependency. > > > Problem: I already tested my implementation by adding it as an dependency > to a simple Flink word-count example. The test was within an IDE which > works perfectly fine. But when i package that Flink work-count example an= d > try > > to run it with "./flink run " or by uploading and submitting it as a job, > it tells me that my buffer-process-class could not be found: > > In German: "Fehler: Hauptklasse > de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nic= ht > gefunden oder geladen werden" > > Roughly translated: "Error: Main class > de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not = be > found or loaded" > > > Code snipplets: > > Example - Adding my custom sink to send data to another data stream > processor: > > dataStream.addSink( > (SinkFunction)DSPConnectorFactory > .getInstance() > .createSinkConnector( > new DSPConnectorConfig > .Builder("localhost", 9656) > .withDSP("flink") > .withBufferConnectorString("buffer-connection-string") > .withHWM(20) > .withTimeout(10000) > .build())); > > > > The way i am trying to start the separate buffer-process: JavaProcessBuil= der.exec(MessageBufferProcess.class, connectionString, addSentMessagesFrame= ); > How JavaProcessBuilder.exec looks like: > public static Process exec(Class javaClass, String connectionString, bool= ean addSentMessagesFrame) throws IOException, InterruptedException { > String javaHome =3D System.getProperty("java.home"); > String javaBin =3D javaHome + > File.separator + "bin" + > File.separator + "java"; > String classpath =3D System.getProperty("java.class.path"); > String className =3D javaClass.getCanonicalName(); > > System.out.println("Trying to build process " + classpath + " " += className); > > ProcessBuilder builder =3D new ProcessBuilder( > javaBin, "-cp", classpath, className, connectionString, B= oolean.toString(addSentMessagesFrame)); > > builder.redirectOutput(ProcessBuilder.Redirect.INHERIT); > builder.redirectError(ProcessBuilder.Redirect.INHERIT); > > Process process =3D builder.start(); > return process; > } > > I also tried running that message-buffer process separately in another ma= ven project and its packaged .jar file. That worked perfectly fine too. Tha= t is why I am assuming that my approach is not appropriate for running in F= link. > Did I miss something and starting my approach doesn't actually work withi= n Flink's context? I hope the information I gave you is sufficient to help = understanding my issue. If you need any more information feel free to messa= ge me! > > Thanks for any help! > > With best regards > > > --0000000000005019760579ab406a Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable

The error is most likely due to classpath issue. = Because classpath is different when you running flink program in IDE and ru= n it in cluster.=C2=A0

And starting another jvm pr= ocess in SourceFunction doesn't seems a good approach to me, is it poss= ible for you to do in your custom SourceFunction ?


Ly, The Anh <the.a.ly@campus.tu-berlin.de&g= t;=E4=BA=8E2018=E5=B9=B411=E6=9C=882=E6=97=A5=E5=91=A8=E4=BA=94 =E4=B8=8B= =E5=8D=885:22=E5=86=99=E9=81=93=EF=BC=9A
Yes, i = did. It is definitely there. I tried and made a separate Maven project to t= est if something was wrong with my jar.=C2=A0
The resulting shaded jar of that test project was fine an= d the message-buffer-process was running with that test jar.=C2=A0


Am 02.11.2018 04:47 schrieb Yun Tang <myasuka@live.com>:
Hi

Since you use the message-buffer-process as a dependency and the error tell= s you class not found, have you ever check your application jar package whe= ther containing the wanted MessageBufferProcess.class? If not existed, try = to use assembly-maven=C2=A0 or shaded-maven plugin to include your classes.

Best
Yun Tang

From:= Ly, The Anh <the.a.ly@campus.tu-berlin.de>
Sent: Friday, November 2, 2018 6:33
To: user@= flink.apache.org
Subject: Starting a seperate Java process within a Flink cluster
=C2=A0

Hello,


I am currently working on my masters and I encountered a difficult probl= em.


Background (for context): I am trying to connect different data stream p= rocessors. Therefore i am using Flink's internal=C2=A0mechanisms of cre= ating custom sinks and sources to receive from and send to different data s= tream processors.=C2=A0I am starting a separate=C2=A0

process (message-buffer-process) in those custom sinks and sources to=C2= =A0communicate and=C2=A0buffer data into that message-buffer-process.=C2=A0= My implementation is=C2=A0created with Maven=C2=A0and it could potentially= be added as an dependency.=C2=A0


Problem: I already tested my implementation=C2=A0by=C2=A0adding it=C2=A0= as an=C2=A0dependency to=C2=A0a=C2=A0simple Flink=C2=A0word-count=C2=A0exam= ple. The test=C2=A0was within an IDE which works perfectly fine. But when i= package that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting i= t as a job, it tells me that my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.message= buffer.MessageBufferProcess konnte nicht gefunden oder geladen werden"=

Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.= messagebuffer.MessageBufferProcess could not be found or loaded"


Code snipplets:

Example - Adding my custom sink to send data to another data stream proc= essor:

dataStream.addSink(
	(SinkFu=
nction)=
DSPConnectorFac=
tory
		.getInstance()
		.createSinkConnector(
			new DSPConnectorConfi=
g
				.Bui=
lder("=
localhost"=
, 9656)
                    		.withDSP("flink")
				.withBufferConnectorString("buffer-connection-string")
                    		.withHWM(20)
                    		.withTimeout(10000)
                    		.build()));



The way i am trying to start th= e separate buffer-process:=C2=A0J= avaProcessBuilder.exec(MessageBufferProcess.class, connectionString, addSen= tMessagesFrame);
How
Ja= vaProcessBuilder.exec=C2=A0looks like:
public static Process exec(Class javaClass, String connectionString, = boolean addSentMessagesFrame) throws IOException, InterruptedException {
String javaHome =3D System.getProperty("j= ava.home");
String javaBin =3D javaHome= +
File.separator + "bin" = +
File.separator + "java";=
String classpath =3D System.getProperty(&qu= ot;java.class.path");
String className = =3D javaClass.getCanonicalName();

System.out.println("Trying to build= process " + classpath + " " + className);

ProcessBuilder= builder =3D new ProcessBuilder(
jav= aBin, "-cp", classpath, className, connectionString, Boolean.toSt= ring(addSentMessagesFrame));

builder.redirectOutput(ProcessBuilder.Redirec= t.INHERIT);
builder.redirectError(ProcessBu= ilder.Redirect.INHERIT);

Process process =3D builder.start();
return process;
}

I= also tried running that message-buffer process separately in another maven= project and its packaged .jar file. That worked perfectly fine too. That i= s why I am assuming that my approach is not appropriate for running in Flin= k.
Did I miss something and starting my approach doesn't actually wo= rk within Flink's context? I hope the information I gave you is suffici= ent to help understanding my issue. If you need any more information feel f= ree to message me!

Thanks for any help!

With best regards


--0000000000005019760579ab406a--