flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@apache.org>
Subject Re: Issue deploying a topology to flink with a java api
Date Thu, 14 Apr 2016 09:37:04 GMT
For the fix, you need to use the current development version of Flink,
ie, change your maven dependency from <version>1.0</version> to
<version>1.1-SNAPSHOT</version>

One question: what is FlinkGitService.class? It does only show up when
you get the ClassLoader:

> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, FlinkGitService.class.getClassLoader());

It is the class that contains methods deploy() and getFlinkTopology() ?

-Matthias

On 04/14/2016 05:20 AM, star jlong wrote:
> What I'm  trying to say is that to get submit the flink topology to flink, I had to do
an invocation of the mainMethod(which contain the actaul topology) of my topology with the
class java.lang.reflect.Method.That is if you a take look at the following the topology the
mainMethod is buildTopologypublic class WordCountTopology {
>     public static void main(String[] args) throws Exception {
> 
>     Config conf = new Config();
>     conf.setDebug(true);
>     if (args != null && args.length > 0) {
> 
>         conf.setNumWorkers(1);
>         conf.setMaxTaskParallelism(1);
>         FlinkSubmitter.submitTopology(args[0], conf, buildTopology());
> 
>     }
>     // Otherwise, we are running locally
>     else {
>         conf.setMaxTaskParallelism(1);
>         FlinkLocalCluster cluster = new FlinkLocalCluster();
>         cluster.submitTopology("word-count", conf, buildTopology());
>         Thread.sleep(10000);
>     }
> }
> 
> public static FlinkTopology buildTopology() {
> 
>     TopologyBuilder builder = new TopologyBuilder();
> 
>     builder.setSpout("spout", new RandomSentenceSpout(), 1);
>     builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
>     builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word"));
> 
>     builder.setBolt("writeIntoFile", new BoltFileSink("/home/username/wordcount.txt",
new OutputFormatter() {
>         private static final long serialVersionUID = 1L;
> 
>         @Override
>         public String format(Tuple tuple) {
>             return tuple.toString();
>         }
>     }), 1).shuffleGrouping("count");
> 
>     return FlinkTopology.createTopology(builder);
> 
> }
> }That is the method that I want to invoke from my jar so that I will be able to do the
submitting of the topology without any problem ie
> 
> final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId,
uploadedJarLocation, getFlinkTopogy(String.format("file://%s", jarPath),properties.getProperty("topologyMainClass"),
properties.getProperty("methodName")));
> Where getFlinkTopology() return the contains actually topology
> 
> But while doing that reflection I had an exception.
> 
> Another question please. How do I make used of the hotflix of Till. 
> 
>     Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <mjsax@apache.org> a écrit :
>  
> 
>  I cannot follow completely in your last step when you fail. What do you
> mean by "I'm stuck at the level when I want to copy that from the jar to
> submit it to flink"?
> 
> Btw: I copied the code from the SO question and it works for me on the
> current master (which includes Till's hotfix).
> 
> -Matthias
> 
> 
> On 04/13/2016 09:39 PM, star jlong wrote:
>> Thanks Matthias for the reply. 
>> Maybe I should explain what I want to do better.My objective is to deploy a flink
topology on flink using java but in the production mode. For that here are the step that I
have taken.
>> 1-Convert a sample wordcount storm topology to a flink topology as indicated here
https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the topology on local
mode (with my IDE eclipse) and on production mode by assembling everything with a mvn clean
install then submitting the jar to flink on the command line with 
>> ./bin/flink run -c stormWorldCount.WordCountTopology /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar
myFlinkTopology
>> At this level everything went well.
>>
>> Then I wanted to submit the same jar to flink on the production mode by using a java
program. Then I decided to create a mainMethod in my topology that returns the flinkTopology
which I wanted to submit to flink using the FlinkClient. But I'm stuck at the level when I
want to copy that from the jar to submit it to flink.
>>
>> I know that is possible because I have used the same procedure with a storm topology
that it works perfectly well.
>> What I'm missing please?
>>   jstar
>>
>>     Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <mjsax@apache.org> a écrit
:
>>   
>>
>>   Hi jstar,
>>
>> I need to have a close look. But I am wondering why you use reflection
>> in the first place? Is there any specific reason for that?
>>
>> Furthermore, the example provided in project maven-example also covers
>> the case to submit a topology to Flink via Java. Have a look at
>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>>
>> It contains a main() method and you can just run it as a regular Java
>> program in your IDE.
>>
>> The SO question example should also work; it also contains a main()
>> method, so you should be able to run it.
>>
>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
>> ExecutuionEnvironment in you code. This happen automatically with
>> FlinkClient/FlinkSubmitter.
>>
>> Furthermore, I would recommend to use FlinkSubmitter instead of
>> FlinkClient as it is somewhat simpler to use.
>>
>> About SO question: I guess the problem is the jar assembling. The user says
>>
>> "Since I'using maven to handle my dependencies, I do a Mvn clean install
>> to obtain the jar."
>>
>> I guess this is not sufficient to bundle a correct jar. Have a look into
>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
>> correctly. (Regular maven artifact do not work for job submission...)
>>
>> Will have a close look and follow up... Hope this helps already.
>>
>> -Matthias
>>
>> On 04/13/2016 06:23 PM, star jlong wrote:
>>> Thanks for the reply.
>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. 
>>> Here is the try that I did RemoteEnvironment remote = new RemoteEnvironment(ipJobManager,
6123, jarPath); remote.execute();
>>> While running the program, this is the exception that I got.
>>> java.lang.RuntimeException: No data sinks have been created yet. A program needs
at least one sink that consumes data. Examples are writing the data set or printing it.
>>>   
>>>
>>>     Le Mercredi 13 avril 2016 16h54, Till Rohrmann <trohrmann@apache.org>
a écrit :
>>>   
>>>
>>>   I think this is not the problem here since the problem is still happening
>>> on the client side when the FlinkTopology tries to copy the registered
>>> spouts. This happens before the job is submitted to the cluster. Maybe
>>> Mathias could chime in here.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>>
>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>>> That one should deal with jars, classloaders, etc for you.
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <jlongstar@yahoo.fr.invalid>
>>>> wrote:
>>>>
>>>>> Thanks for the suggestion. Sure those examples are interesting and I
have
>>>>> deploy them successfully on flink. The deployment is done the command
>>>> line
>>>>> that is doing something like
>>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>>> flink using a java program.
>>>>>
>>>>> Thanks.
>>>>>
>>>>>     Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>>> chesnay@apache.org>
>>>>> a écrit :
>>>>>
>>>>>
>>>>>   you can find examples here:
>>>>>
>>>>>
>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>>
>>>>> we haven't established yet that it is an API issue; it could very well
>>>>> be caused by the reflection magic you're using...
>>>>>
>>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>>> has a working example for deploying a topology using the flink dependency
>>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>>
>>>>>> Thanks,
>>>>>> jstar
>>>>>>
>>>>>>       Le Mercredi 13 avril 2016 13h44, star jlong
>>>>> <jlongstar@yahoo.fr.INVALID> a écrit :
>>>>>>
>>>>>>
>>>>>>   Hi Schepler,
>>>>>>
>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue
as
>>>>> indicated on that post because I'm the one that posted that issue.
>>>>>>
>>>>>>       Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>>> chesnay@apache.org> a écrit :
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>>
>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>>> Hi jstar,
>>>>>>>
>>>>>>> what's exactly the problem you're observing?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>>> <jlongstar@yahoo.fr.invalid
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi there,
>>>>>>>>
>>>>>>>> I'm jstar. I have been playing around with flink. I'm very
much
>>>>> interested
>>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>>> on stackoverflow, that is the try that I have given. But
I was stuck
>>>>> with
>>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>> jstar
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>   
>>>
>>
>>
>>   
>>
> 
> 
>   
> 


Mime
View raw message