flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@apache.org>
Subject Re: Working with storm compatibility layer
Date Thu, 14 Jan 2016 12:25:42 GMT
Just saw your email after my answer...

Have a look here about task slots.
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots

Also have a look here (starting from 18:30):
https://www.youtube.com/watch?v=UEkjRN8jRx4

-Matthias


On 01/14/2016 12:05 PM, Shinhyung Yang wrote:
> Dear Matthias,
> 
> Thank you very much again. I changed the value of
> taskmanager.numberOfTaskSlots from 1 to 128 in
> flink-0.10.1/conf/flink-conf.yaml, stopped the local cluster and
> started local cluster again. And it works fine and well. (It is still
> running and I can check it clear on the webfrontend) Although I'm not
> sure whether it would be safe to keep the value like this or not.
> 
> Thank you.
> Best regards,
> Shinhyung
> 
> On Thu, Jan 14, 2016 at 7:53 PM, Shinhyung Yang
> <shinhyung.yang@gmail.com> wrote:
>> Dear Matthias,
>>
>> Thank you for a quick reply. It failed again, however I was able to
>> access to its WebFrontend and it gave me some logs. I wanted to show
>> logs immediately before digging down into it.
>>
>> 19:48:18,011 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
>> (topology).
>> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
>> (topology).
>> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
>> changed to RUNNING.
>> 19:48:18,014 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> CREATED to SCHEDULED
>> 19:48:18,014 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> SCHEDULED to DEPLOYING
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Deploying Source: src (1/1) (attempt #0) to localhost
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
>> SCHEDULED
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
>> DEPLOYING
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Deploying bolt (1/8) (attempt #0) to localhost
>> 19:48:18,015 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>           - Received task Source: src (1/1)
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
>> SCHEDULED
>> 19:48:18,016 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Loading JAR files for task Source: src (1/1)
>> 19:48:18,017 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>           - Received task bolt (1/8)
>> 19:48:18,017 INFO  org.apache.flink.runtime.blob.BlobCache
>>           - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
>> localhost/127.0.0.1:36498
>> 19:48:18,017 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> DEPLOYING to CANCELING
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
>> CANCELING
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Attempting to cancel task Source: src (1/1)
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
>> CANCELED
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Source: src (1/1) switched to CANCELING
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
>> CANCELED
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Attempting to cancel task bolt (1/8)
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - bolt (1/8) switched to CANCELING
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Loading JAR files for task bolt (1/8)
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
>> CANCELED
>> 19:48:18,018 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
>> changed to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decrease the
>> operator parallelism or increase the number of slots per TaskManager
>> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
>> (unassigned) - [SCHEDULED] > with groupID <
>> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
>> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>>> . Resources available to scheduler: Number of instances=1, total
>> number of slots=1, available slots=0
>> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
>> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
>> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
>> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 19:48:18,019 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (5/8) (4cb013017e278161124c3c6549cd3f80) switched from CREATED to
>> CANCELED
>> 19:48:18,020 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (6/8) (a8737f034530b66a44e8c0a2cd60528d) switched from CREATED to
>> CANCELED
>> 19:48:18,020 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (7/8) (495b895a7f338647b876640f91d823d6) switched from CREATED to
>> CANCELED
>> 19:48:18,020 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (8/8) (0c45cccb3f5b786e4bdacd20d3e164c8) switched from CREATED to
>> CANCELED
>> 19:48:18,526 WARN  akka.remote.ReliableDeliverySupervisor
>>           - Association with remote system
>> [akka.tcp://flink@127.0.0.1:50370] has failed, address is now gated
>> for [5000] ms. Reason is: [Disassociated].
>> 19:48:18,527 WARN  akka.remote.ReliableDeliverySupervisor
>>           - Association with remote system
>> [akka.tcp://flink@127.0.0.1:54918] has failed, address is now gated
>> for [5000] ms. Reason is: [Disassociated].
>> 19:48:18,531 WARN  akka.remote.ReliableDeliverySupervisor
>>           - Association with remote system
>> [akka.tcp://flink@127.0.0.1:53543] has failed, address is now gated
>> for [5000] ms. Reason is: [Disassociated].
>> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Source: src (1/1) switched to CANCELED
>> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - bolt (1/8) switched to CANCELED
>> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Freeing task resources for Source: src (1/1)
>> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Freeing task resources for bolt (1/8)
>> 19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>           - Unregistering task and sending final execution state
>> CANCELED to JobManager for task Source: src
>> (3535644576ae695d2685a65401e16fc4)
>> 19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>           - Unregistering task and sending final execution state
>> CANCELED to JobManager for task bolt
>> (391ac2875a2fdc86d8af4f2d51e3e849)
>> 19:48:18,770 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CANCELING to
>> CANCELED
>> 19:48:18,770 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> CANCELING to CANCELED
>> 19:48:18,773 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
>> changed to FAILED.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decrease the
>> operator parallelism or increase the number of slots per TaskManager
>> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
>> (unassigned) - [SCHEDULED] > with groupID <
>> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
>> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>>> . Resources available to scheduler: Number of instances=1, total
>> number of slots=1, available slots=0
>> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
>> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
>> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
>> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> It seems like my setting has some problem from a quick look at the log?
>>
>> Thank you.
>> Best regards,
>> Shinhyung
>>
>> On Thu, Jan 14, 2016 at 7:29 PM, Matthias J. Sax <mjsax@apache.org> wrote:
>>> Hi,
>>>
>>> I can submit the topology without any problems. Your code is fine.
>>>
>>> If your program "exits silently" I would actually assume, that you
>>> submitted the topology successfully. Can you see the topology in
>>> JobManager WebFrontend? If not, do you see any errors in the log files?
>>>
>>> -Matthias
>>>
>>> On 01/14/2016 07:37 AM, Shinhyung Yang wrote:
>>>> Dear Matthias,
>>>>
>>>> Thank you for the reply! I am so sorry to respond late on the matter.
>>>>
>>>>> I just double checked the Flink code and during translation from Storm
>>>>> to Flink declareOuputFields() is called twice. You are right that is
>>>>> does the same job twice, but that is actually not a problem. The Flink
>>>>> code is cleaner this way to I guess we will not change it.
>>>>
>>>> Thank you for checking. I don't think it contributed any part of my
>>>> current problem anyways. For my case though, it is called 3 times if
>>>> the number is important at all.
>>>>
>>>>> About lifecyle:
>>>>> If you submit your code, during deployment, Spout.open() and
>>>>> Bolt.prepare() should be called for each parallel instance on each
>>>>> Spout/Bolt of your topology.
>>>>>
>>>>> About your submission (I guess this should solve your current problem):
>>>>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
>>>>> but FlinkSubmitter. You have to distinguish three cases:
>>>>>
>>>>>   - local/debug/IDE mode: use FlinkLocalCluster
>>>>>     => you do not need to start any Flink cluster before --
>>>>> FlinkLocalCluster is started up in you current JVM
>>>>>     * the purpose is local debugging in an IDE (this allows to easily
>>>>> set break points and debug code)
>>>>>
>>>>>   - pseudo-distributed mode: use FlinkSubmitter
>>>>>     => you start up a local Flink cluster via bin/start-local.sh
>>>>>     * this local Flink cluster run in an own JVM and looks like a real
>>>>> cluster to the Flink client, ie, "bin/flink run"
>>>>>     * thus, you just use FlinkSubmitter as for a real cluster (with
>>>>> JobManager/Nimbus hostname "localhost")
>>>>>     * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
>>>>> started in your current JVM, but your code is shipped to the local
>>>>> cluster you started up beforehand via bin/start-local.sh and executed
in
>>>>> this JVM
>>>>>
>>>>>   - distributed mode: use FlinkSubmitter
>>>>>     => you start up Flink in a real cluster using bin/start-cluster.sh
>>>>>     * you use "bin/flink run" to submit your code to the real cluster
>>>>
>>>> Thank you for the explanation, now I have clearer understanding of
>>>> clusters and submitters. However my problem is not fixed yet. Here's
>>>> my code:
>>>>
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>> // ./src/main/java/myexample/App.java
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>>
>>>> package myexample;
>>>>
>>>> import backtype.storm.Config;
>>>> import backtype.storm.LocalCluster;
>>>> import myexample.spout.StandaloneSpout;
>>>> import backtype.storm.generated.StormTopology;
>>>> import backtype.storm.topology.IRichSpout;
>>>> import backtype.storm.topology.TopologyBuilder;
>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>
>>>> import myexample.bolt.Node;
>>>> import myexample.bolt.StandardBolt;
>>>>
>>>> import java.util.Arrays;
>>>> import java.util.List;
>>>>
>>>> //
>>>> import org.apache.flink.storm.api.FlinkTopology;
>>>> //import org.apache.flink.storm.api.FlinkLocalCluster;
>>>> import org.apache.flink.storm.api.FlinkSubmitter;
>>>> //import org.apache.flink.storm.api.FlinkClient;
>>>> import org.apache.flink.storm.api.FlinkTopologyBuilder;
>>>>
>>>> public class App
>>>> {
>>>>     public static void main( String[] args ) throws Exception
>>>>     {
>>>>         int layer = 0;
>>>>         StandaloneSpout spout = new StandaloneSpout();
>>>>         Config conf = new Config();
>>>>         conf.put(Config.TOPOLOGY_DEBUG, false);
>>>>         //FlinkLocalCluster cluster = new FlinkLocalCluster();
>>>>         //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
>>>>         //LocalCluster cluster = new LocalCluster();
>>>>
>>>>         layer = Integer.parseInt(args[0]);
>>>>         //cluster.submitTopology("topology", conf,
>>>> BinaryTopology(spout, layer));
>>>>         FlinkSubmitter.submitTopology("topology", conf,
>>>> BinaryTopology(spout, layer));
>>>>         //Thread.sleep(5 * 1000);
>>>>         //FlinkClient.getConfiguredClient(conf).killTopology("topology");
>>>>         //cluster.killTopology("topology");
>>>>         //cluster.shutdown();
>>>>     }
>>>>
>>>>     public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
>>>>     //public static StormTopology BinaryTopology(IRichSpout input, int n)
{
>>>>         return BinaryTopology(input, n,
>>>> Arrays.asList((BaseBasicBolt)new StandardBolt()));
>>>>     }
>>>>
>>>>     public static FlinkTopology BinaryTopology(IRichSpout input, int
>>>> n, List<BaseBasicBolt> boltList) {
>>>>     //public static StormTopology BinaryTopology(IRichSpout input, int
>>>> n, List<BaseBasicBolt> boltList) {
>>>>         FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
>>>>         //TopologyBuilder builder = new TopologyBuilder();
>>>>         String sourceId = "src";
>>>>         builder.setSpout(sourceId, input);
>>>>
>>>>
>>>>         String boltId = "bolt";
>>>>         builder.setBolt(boltId, new Node(), Math.pow(2,
>>>> n)).shuffleGrouping(sourceId);
>>>>
>>>>         return builder.createTopology();
>>>>     }
>>>> }
>>>>
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>> // ./src/main/java/myexample/spout/StandaloneSpout.java
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>>
>>>> package myexample.spout;
>>>>
>>>> import backtype.storm.spout.SpoutOutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichSpout;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Values;
>>>>
>>>> import java.io.*;
>>>> import java.text.DateFormat;
>>>> import java.text.SimpleDateFormat;
>>>> import java.util.*;
>>>>
>>>> public class StandaloneSpout extends BaseRichSpout {
>>>>
>>>>     private SpoutOutputCollector mCollector;
>>>>
>>>>     @Override
>>>>     public void open(Map conf, TopologyContext context,
>>>> SpoutOutputCollector collector) {
>>>>         this.mCollector = collector;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void nextTuple() {
>>>>         long currentTime = System.currentTimeMillis();
>>>>
>>>>         // TODO: Currently, do not check bound of list, because of
>>>> experiment.(Avoid branch)
>>>>         mCollector.emit(new Values(new String("aaa"),
>>>> System.currentTimeMillis(), 0));
>>>>
>>>>     }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         System.out.println("declareOutputFields");
>>>>         declarer.declare(new Fields("string1", "timestamp", "omitted"));
>>>>     }
>>>> }
>>>>
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>> // ./src/main/java/myexample/bolt/Node.java
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>>
>>>> package myexample.bolt;
>>>>
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.BasicOutputCollector;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>> import java.util.Map;
>>>>
>>>> public class Node extends BaseBasicBolt {
>>>>
>>>>     public static boolean isTupleEmpty(Tuple tuple) {
>>>>       return false;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>         super.prepare(stormConf, context);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void cleanup() {
>>>>         super.cleanup();
>>>>     }
>>>>
>>>>     @Override
>>>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>         collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
>>>>     }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         declarer.declare(new Fields("string1", "string2", "timestamp",
>>>> "omitted"));
>>>>     }
>>>> }
>>>>
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>> // ./src/main/java/myexample/bolt/StandardBolt.java
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>>
>>>> package myexample.bolt;
>>>>
>>>> import java.util.Map;
>>>>
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.BasicOutputCollector;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>> import backtype.storm.tuple.Tuple;
>>>>
>>>> public class StandardBolt extends BaseBasicBolt {
>>>>
>>>>     @Override
>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>         super.prepare(stormConf, context);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>     }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>     }
>>>> }
>>>>
>>>> Probably it is the source code which has the problem or other things
>>>> around the project environment might contain the problem. I would
>>>> really appreciate if you could verify whether the code looks ok or
>>>> not.
>>>>
>>>>>
>>>>> About further debugging: you can increase the log level to get more
>>>>> information:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html
>>>>
>>>> I tried to inject `log4j.properties' file that I got from a sample
>>>> flink-quickstart-java application created from `mvn
>>>> archetype:generate' to a ./target/*.jar but it does not work. I tried
>>>> this because placing that `log4j.properties' file under
>>>> ./src/main/resources of my project did not work in the first place.
>>>>
>>>> Thank you again for your help.
>>>> With best regards,
>>>> Shinhyung
>>>>
>>>>> Hope this helps!
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>>>>>> Dear Matthias,
>>>>>>
>>>>>> Thank you for replying!
>>>>>>
>>>>>>     that sounds weird and should not happen -- Spout.open() should
get
>>>>>>     called exactly once.
>>>>>>
>>>>>>
>>>>>> That's what I thought too. I'm new to both Storm and Flink so it's
quite
>>>>>> complicated for me to handle both yet; would it be helpful for me
if I
>>>>>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>>>>>> invoked, what should be called other than spout.open()?
>>>>>>
>>>>>>     I am not sure about multiple calls to
>>>>>>
>>>>>>     declareOuputFields though -- if might be called multiple times
-- would
>>>>>>     need to double check the code.
>>>>>>
>>>>>>
>>>>>> I'll check my code too.
>>>>>>
>>>>>>
>>>>>>     However, the call to declareOuputFields should be idempotent,
so it
>>>>>>     should actually not be a problem if it is called multiple times.
Even if
>>>>>>     Storm might call this method only once, there is no guarantee
that it is
>>>>>>     not called multiple time. If this is a problem for you, please
let me
>>>>>>     know. I think we could fix this and make sure the method is only
called
>>>>>>     once.
>>>>>>
>>>>>>
>>>>>> Actually it doesn't seem to be a problem for now. It just does the
same
>>>>>> job multiple times.
>>>>>>
>>>>>>
>>>>>>     It would be helpful if you could share you code. What do you
mean with
>>>>>>     "exits silently"? No submission happens? Did you check the logs?
As you
>>>>>>     mentioned FlinkLocalCluster, I assume that you run within an
IDE?
>>>>>>
>>>>>>
>>>>>> The topology doesn't seem to continue. There's a set of initialization
>>>>>> code in the open method of the program's spout and it looks hopeless
if
>>>>>> it's not invoked. Is there any way to check the logs other than using
>>>>>> println() calls? I'm running it on the commandline with having
>>>>>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>>>>>
>>>>>>
>>>>>>     Btw: lately we fixed a couple of bugs. I would suggest that you
use the
>>>>>>     latest version from Flink master branch. I should work with 0.10.1
>>>>>>     without problems.
>>>>>>
>>>>>>
>>>>>> It was vey tedious for me to deal with a pom.xml file and .m2
>>>>>> repository. So I preferred to use maven central. But I should try
with
>>>>>> the master branch if I have to.
>>>>>>
>>>>>> I will quickly check if I could share some of the code.
>>>>>>
>>>>>> Thank you again for the help!
>>>>>> With best regards,
>>>>>> Shinhyung Yang
>>>>>>
>>>>>>
>>>>>>
>>>>>>     -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>>>>>     > Howdies to everyone,
>>>>>>     >
>>>>>>     > I'm trying to use the storm compatibility layer on Flink
0.10.1. The
>>>>>>     > original storm topology works fine on Storm 0.9.5 and I
have
>>>>>>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>>>>>     > FlinkTopology classes according to the programming guide
>>>>>>     >
>>>>>>     (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>>>>>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos
7 (7.2.1511).
>>>>>>     > What happens is, it seems to be going all the way to submitTopology
>>>>>>     > method without any problem, however it doesn't invoke open
method of
>>>>>>     > Spout class but declareOutputFields method is called for
multiple
>>>>>>     > times and the program exits silently. Do you guys have any
idea what's
>>>>>>     > going on here or have any suggestions? If needed, then please
ask me
>>>>>>     > for more information.
>>>>>>     >
>>>>>>     > Thank you for reading.
>>>>>>     > With best regards,
>>>>>>     > Shinhyung Yang
>>>>>>     >
>>>>>>
>>>>>
>>>


Mime
View raw message