flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ly, The Anh" <the.a...@campus.tu-berlin.de>
Subject Re: Starting a seperate Java process within a Flink cluster
Date Fri, 02 Nov 2018 09:22:22 GMT
Yes, i did. It is definitely there. I tried and made a separate Maven project to test if something
was wrong with my jar.
The resulting shaded jar of that test project was fine and the message-buffer-process was
running with that test jar.


Am 02.11.2018 04:47 schrieb Yun Tang <myasuka@live.com>:
Hi

Since you use the message-buffer-process as a dependency and the error tells you class not
found, have you ever check your application jar package whether containing the wanted MessageBufferProcess.class?
If not existed, try to use assembly-maven<https://maven.apache.org/plugins/maven-assembly-plugin/>
 or shaded-maven<https://maven.apache.org/plugins/maven-shade-plugin/> plugin to include
your classes.

Best
Yun Tang
________________________________
From: Ly, The Anh <the.a.ly@campus.tu-berlin.de>
Sent: Friday, November 2, 2018 6:33
To: user@flink.apache.org
Subject: Starting a seperate Java process within a Flink cluster


Hello,


I am currently working on my masters and I encountered a difficult problem.


Background (for context): I am trying to connect different data stream processors. Therefore
i am using Flink's internal mechanisms of creating custom sinks and sources to receive from
and send to different data stream processors. I am starting a separate

process (message-buffer-process) in those custom sinks and sources to communicate and buffer
data into that message-buffer-process.  My implementation is created with Maven and it could
potentially be added as an dependency.


Problem: I already tested my implementation by adding it as an dependency to a simple Flink
word-count example. The test was within an IDE which works perfectly fine. But when i package
that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting it as a job, it tells me that
my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess
konnte nicht gefunden oder geladen werden"

Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess
could not be found or loaded"


Code snipplets:

Example - Adding my custom sink to send data to another data stream processor:

dataStream.addSink(
        (SinkFunction)DSPConnectorFactory
                .getInstance()
                .createSinkConnector(
                        new DSPConnectorConfig
                                .Builder("localhost", 9656)
                                .withDSP("flink")
                                .withBufferConnectorString("buffer-connection-string")
                                .withHWM(20)
                                .withTimeout(10000)
                                .build()));



The way i am trying to start the separate buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class,
connectionString, addSentMessagesFrame);
How JavaProcessBuilder.exec looks like:
public static Process exec(Class javaClass, String connectionString, boolean addSentMessagesFrame)
throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome +
                File.separator + "bin" +
                File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = javaClass.getCanonicalName();

        System.out.println("Trying to build process " + classpath + " " + className);

        ProcessBuilder builder = new ProcessBuilder(
                javaBin, "-cp", classpath, className, connectionString, Boolean.toString(addSentMessagesFrame));

        builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        builder.redirectError(ProcessBuilder.Redirect.INHERIT);

        Process process = builder.start();
        return process;
}

I also tried running that message-buffer process separately in another maven project and its
packaged .jar file. That worked perfectly fine too. That is why I am assuming that my approach
is not appropriate for running in Flink.
Did I miss something and starting my approach doesn't actually work within Flink's context?
I hope the information I gave you is sufficient to help understanding my issue. If you need
any more information feel free to message me!

Thanks for any help!

 With best regards


Mime
View raw message