flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ly, The Anh" <the.a...@campus.tu-berlin.de>
Subject Re: Starting a seperate Java process within a Flink cluster
Date Fri, 02 Nov 2018 09:22:22 GMT
Yes, i did. It is definitely there. I tried and made a separate Maven project to test if something
was wrong with my jar.
The resulting shaded jar of that test project was fine and the message-buffer-process was
running with that test jar.

Am 02.11.2018 04:47 schrieb Yun Tang <myasuka@live.com>:

Since you use the message-buffer-process as a dependency and the error tells you class not
found, have you ever check your application jar package whether containing the wanted MessageBufferProcess.class?
If not existed, try to use assembly-maven<https://maven.apache.org/plugins/maven-assembly-plugin/>
 or shaded-maven<https://maven.apache.org/plugins/maven-shade-plugin/> plugin to include
your classes.

Yun Tang
From: Ly, The Anh <the.a.ly@campus.tu-berlin.de>
Sent: Friday, November 2, 2018 6:33
To: user@flink.apache.org
Subject: Starting a seperate Java process within a Flink cluster


I am currently working on my masters and I encountered a difficult problem.

Background (for context): I am trying to connect different data stream processors. Therefore
i am using Flink's internal mechanisms of creating custom sinks and sources to receive from
and send to different data stream processors. I am starting a separate

process (message-buffer-process) in those custom sinks and sources to communicate and buffer
data into that message-buffer-process.  My implementation is created with Maven and it could
potentially be added as an dependency.

Problem: I already tested my implementation by adding it as an dependency to a simple Flink
word-count example. The test was within an IDE which works perfectly fine. But when i package
that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting it as a job, it tells me that
my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess
konnte nicht gefunden oder geladen werden"

Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess
could not be found or loaded"

Code snipplets:

Example - Adding my custom sink to send data to another data stream processor:

                        new DSPConnectorConfig
                                .Builder("localhost", 9656)

The way i am trying to start the separate buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class,
connectionString, addSentMessagesFrame);
How JavaProcessBuilder.exec looks like:
public static Process exec(Class javaClass, String connectionString, boolean addSentMessagesFrame)
throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome +
                File.separator + "bin" +
                File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = javaClass.getCanonicalName();

        System.out.println("Trying to build process " + classpath + " " + className);

        ProcessBuilder builder = new ProcessBuilder(
                javaBin, "-cp", classpath, className, connectionString, Boolean.toString(addSentMessagesFrame));


        Process process = builder.start();
        return process;

I also tried running that message-buffer process separately in another maven project and its
packaged .jar file. That worked perfectly fine too. That is why I am assuming that my approach
is not appropriate for running in Flink.
Did I miss something and starting my approach doesn't actually work within Flink's context?
I hope the information I gave you is sufficient to help understanding my issue. If you need
any more information feel free to message me!

Thanks for any help!

 With best regards

View raw message