Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B20A918A3E for ; Tue, 1 Sep 2015 21:33:16 +0000 (UTC) Received: (qmail 62558 invoked by uid 500); 1 Sep 2015 21:33:16 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 62479 invoked by uid 500); 1 Sep 2015 21:33:16 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 62469 invoked by uid 99); 1 Sep 2015 21:33:16 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Sep 2015 21:33:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 0B81418234D for ; Tue, 1 Sep 2015 21:33:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.901 X-Spam-Level: ** X-Spam-Status: No, score=2.901 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id qBc-IJ5uBnDy for ; Tue, 1 Sep 2015 21:33:07 +0000 (UTC) Received: from mail-ig0-f176.google.com (mail-ig0-f176.google.com [209.85.213.176]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 91AED20DB9 for ; Tue, 1 Sep 2015 21:33:06 +0000 (UTC) Received: by igcrk20 with SMTP id rk20so11501111igc.1 for ; Tue, 01 Sep 2015 14:33:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=TF9b7GrcwXZhlk9R2hfqY2nIfaxKjX2Iqa2/U0oqSjM=; b=JRGoNs1rl4JVGUSMiel1oR04rh5GTlmzerGUwygNmfsp1zBJFGAiajvNvINZ0f2+zm yvnHL1KaSTaecyujQgwlAcsgQAJilRY5d6hx/2/bR4XkDaYLFG09cWGGRO2a7RcyptN2 usKCnQ77TD2B69lX2dR3glGCSEzpxO0wBL0XojdLKd4P89SUW/SdxneqCrWnUr9uy5Mr +Ao2Z7Vi1s/1leZ+liAfvS7u8YQoX3k1VVaXbnTw96wPaucFqaC9npyI+O4/unC5fm7e OuGZwcKP+7PLdS7/HMUIsR0NRwKDlKBZaPtaAvXBm9a0Xb+wqVoCgJ0l4eX2uIZWT6r2 gDkg== MIME-Version: 1.0 X-Received: by 10.50.143.4 with SMTP id sa4mr317316igb.56.1441143185528; Tue, 01 Sep 2015 14:33:05 -0700 (PDT) Received: by 10.64.140.130 with HTTP; Tue, 1 Sep 2015 14:33:05 -0700 (PDT) In-Reply-To: <55E6169C.9030700@informatik.hu-berlin.de> References: <55E60633.3090209@informatik.hu-berlin.de> <55E60BA4.2000009@informatik.hu-berlin.de> <55E615CB.7080906@informatik.hu-berlin.de> <55E6169C.9030700@informatik.hu-berlin.de> Date: Tue, 1 Sep 2015 16:33:05 -0500 Message-ID: Subject: Re: question on flink-storm-examples From: Jerry Peng To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1134bce2a4dd5f051eb64b97 --001a1134bce2a4dd5f051eb64b97 Content-Type: text/plain; charset=UTF-8 Ya that what I did and everything seems execute fine but when I try to run the WordCount-StormTopology with a file on hfs I get a java.io.FileNotFoundException : java.lang.RuntimeException: java.io.FileNotFoundException: /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or directory) at org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:50) at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.(FileInputStream.java:138) at java.io.FileInputStream.(FileInputStream.java:93) at java.io.FileReader.(FileReader.java:58) at org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:48) However I have that file on my hdfs namespace: $ hadoop fs -ls -R / 15/09/01 21:25:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 /home drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 /home/jerrypeng drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:41 /home/jerrypeng/hadoop drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 /home/jerrypeng/hadoop/dir drwxr-xr-x - jerrypeng supergroup 0 2015-08-24 16:06 /home/jerrypeng/hadoop/hadoop_dir drwxr-xr-x - jerrypeng supergroup 0 2015-09-01 20:48 /home/jerrypeng/hadoop/hadoop_dir/data -rw-r--r-- 3 jerrypeng supergroup 18552 2015-09-01 19:18 /home/jerrypeng/hadoop/hadoop_dir/data/data.txt -rw-r--r-- 3 jerrypeng supergroup 0 2015-09-01 20:48 /home/jerrypeng/hadoop/hadoop_dir/data/result.txt drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:41 /home/jerrypeng/hadoop/hadoop_dir/dir1 drwxr-xr-x - jerrypeng supergroup 0 2015-08-24 15:59 /home/jerrypeng/hadoop/hadoop_dir/test -rw-r--r-- 3 jerrypeng supergroup 32 2015-08-24 15:59 /home/jerrypeng/hadoop/hadoop_dir/test/filename.txt Any idea what's going on? On Tue, Sep 1, 2015 at 4:20 PM, Matthias J. Sax < mjsax@informatik.hu-berlin.de> wrote: > You can use "bin/flink cancel JOBID" or JobManager WebUI to cancel the > running job. > > The exception you see, occurs in FlinkSubmitter.killTopology(...) which > is not used by "bin/flink cancel" or JobMaanger WebUI. > > If you compile the example you yourself, just remove the call to > killTopology(). > > -Matthias > > On 09/01/2015 11:16 PM, Matthias J. Sax wrote: > > Oh yes. I forgot about this. I have already a fix for it in a pending > > pull request... I hope that this PR is merged soon... > > > > If you want to observe the progress, look here: > > https://issues.apache.org/jira/browse/FLINK-2111 > > and > > https://issues.apache.org/jira/browse/FLINK-2338 > > > > This PR, resolves both and fixed the problem you observed: > > https://github.com/apache/flink/pull/750 > > > > -Matthias > > > > > > On 09/01/2015 11:09 PM, Jerry Peng wrote: > >> Hello, > >> > >> I corrected the number of slots for each task manager but now when I try > >> to run the WordCount-StormTopology, the job manager daemon on my master > >> node crashes and I get this exception in the log: > >> > >> java.lang.Exception: Received a message > >> CancelJob(6a4b9aa01ec87db20060210e5b36065e) without a leader session ID, > >> even though the message requires a leader session ID. > >> > >> at > >> > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:41) > >> > >> at > >> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > >> > >> at > >> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > >> > >> at > >> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > >> > >> at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > >> > >> at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > >> > >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > >> > >> at > >> > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > >> > >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > >> > >> at > >> > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) > >> > >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > >> > >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) > >> > >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > >> > >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) > >> > >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > >> > >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> > >> at > >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> > >> at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> > >> at > >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> > >> > >> It seems to have something to do with canceling of the topology after > >> the sleep. Any ideas? > >> > >> > >> Best, > >> > >> > >> Jerry > >> > >> > >> On Tue, Sep 1, 2015 at 3:33 PM, Matthias J. Sax > >> > > >> wrote: > >> > >> Yes. That is what I expected. > >> > >> JobManager cannot start the job, due to less task slots. It logs the > >> exception NoResourceAvailableException (it is not shown in stdout; > see > >> "log" folder). There is no feedback to Flink CLI that the job could > not > >> be started. > >> > >> Furthermore, WordCount-StormTopology sleeps for 5 seconds and tries > to > >> "kill" the job. However, because the job was never started, there > is a > >> NotAliveException which in print to stdout. > >> > >> -Matthias > >> > >> > >> > >> On 09/01/2015 10:26 PM, Jerry Peng wrote: > >> > When I run WordCount-StormTopology I get the following exception: > >> > > >> > ~/flink/bin/flink run WordCount-StormTopology.jar > >> > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/data.txt > >> > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/results.txt > >> > > >> > org.apache.flink.client.program.ProgramInvocationException: The > main > >> > method caused an error. > >> > > >> > at > >> > > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > >> > > >> > at > >> > > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > >> > > >> > at org.apache.flink.client.program.Client.run(Client.java:278) > >> > > >> > at > >> > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) > >> > > >> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) > >> > > >> > at > >> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) > >> > > >> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) > >> > > >> > Caused by: NotAliveException(msg:null) > >> > > >> > at > >> > > >> > org.apache.flink.stormcompatibility.api.FlinkClient.killTopologyWithOpts(FlinkClient.java:209) > >> > > >> > at > >> > > >> > org.apache.flink.stormcompatibility.api.FlinkClient.killTopology(FlinkClient.java:203) > >> > > >> > at > >> > > >> > org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter.main(StormWordCountRemoteBySubmitter.java:80) > >> > > >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> > > >> > at > >> > > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> > > >> > at > >> > > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> > > >> > at java.lang.reflect.Method.invoke(Method.java:483) > >> > > >> > at > >> > > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > >> > > >> > ... 6 more > >> > > >> > > >> > The exception above occurred while trying to run your command. > >> > > >> > > >> > Any idea how to fix this? > >> > > >> > On Tue, Sep 1, 2015 at 3:10 PM, Matthias J. Sax > >> > >> > >> >> >> > >> > wrote: > >> > > >> > Hi Jerry, > >> > > >> > WordCount-StormTopology uses a hard coded dop of 4. If you > >> start up > >> > Flink in local mode (bin/start-local-streaming.sh), you need > >> to increase > >> > the number of task slots to at least 4 in conf/flink-conf.yaml > >> before > >> > starting Flink -> taskmanager.numberOfTaskSlots > >> > > >> > You should actually see the following exception in > >> > log/flink-...-jobmanager-...log > >> > > >> > > NoResourceAvailableException: Not enough free slots > available to > >> > run the job. You can decrease the operator parallelism or > increase > >> > the number of slots per TaskManager in the configuration. > >> > > >> > WordCount-StormTopology does use > StormWordCountRemoteBySubmitter > >> > internally. So, you do use it already ;) > >> > > >> > I am not sure what you mean by "get rid of KafkaSource"? It is > >> still in > >> > the code base. Which version to you use? In > >> flink-0.10-SNAPSHOT it is > >> > located in submodule "flink-connector-kafka" (which is > >> submodule of > >> > "flink-streaming-connector-parent" -- which is submodule of > >> > "flink-streamping-parent"). > >> > > >> > > >> > -Matthias > >> > > >> > > >> > On 09/01/2015 09:40 PM, Jerry Peng wrote: > >> > > Hello, > >> > > > >> > > I have some questions regarding how to run one of the > >> > > flink-storm-examples, the WordCountTopology. How should I > >> run the > >> > job? > >> > > On github its says I should just execute > >> > > bin/flink run example.jar but when I execute: > >> > > > >> > > bin/flink run WordCount-StormTopology.jar > >> > > > >> > > nothing happens. What am I doing wrong? and How can I run > the > >> > > WordCounttopology via StormWordCountRemoteBySubmitter? > >> > > > >> > > Also why did you guys get rid of the KafkaSource class? > What is > >> > the API > >> > > now for subscribing to a kafka source? > >> > > > >> > > Best, > >> > > > >> > > Jerry > >> > > >> > > >> > >> > > > > --001a1134bce2a4dd5f051eb64b97 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Ya that what I did and everything seems execute fine but w= hen I try to run the=C2=A0WordCount-StormTop= ology with a file on=C2=A0hfs I get a=C2=A0java.io.FileNotFoundException :

java.lang.R= untimeException: java.io.FileNotFoundException: /home/jerrypeng/hadoop/hado= op_dir/data/data.txt (No such file or directory)

at org.apache.flink.stormcom= patibility.util.StormFileSpout.open(StormFileSpout.java:50)

at org.apache.flink.stormcom= patibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper= .java:102)

at org.apache.flink.streamin= g.api.operators.StreamSource.run(StreamSource.java:57)

at org.apache.flink.streamin= g.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)

at org.apache.flink.streamin= g.runtime.tasks.StreamTask.invoke(StreamTask.java:172)

at org.apache.flink.runtime.= taskmanager.Task.run(Task.java:581)

at java.lang.Thread.run(Thre= ad.java:745)

Caused by: java.io= .FileNotFoundException: /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No= such file or directory)

at java.io.FileInputStream.o= pen(Native Method)

at java.io.FileInputStream.&= lt;init>(FileInputStream.java:138)

at java.io.FileInputStream.&= lt;init>(FileInputStream.java:93)

at java.io.FileReader.<in= it>(FileReader.java:58)

at org.apache.flink.stormcom= patibility.util.StormFileSpout.open(StormFileSpout.java:48)



However I have that file on my hdfs namespace:

=


$ hadoop fs -ls -R /

15/09/01 21:25:11 WA= RN util.NativeCodeLoader: Unable to load native-hadoop library for your pla= tform... using builtin-java classes where applicable

drwxr-xr-x =C2=A0 - jerrypeng supergr= oup=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 0 2015-08-21 14:40 /home

drwxr-xr-x =C2=A0 - jerryp= eng supergroup=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 0 2015-08-21 14:40 /home/j= errypeng

drwxr-= xr-x =C2=A0 - jerrypeng supergroup=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 0 2015= -08-21 14:41 /home/jerrypeng/hadoop

drwxr-xr-x =C2=A0 - jerrypeng supergroup=C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 0 2015-08-21 14:40 /home/jerrypeng/hadoop/dir

drwxr-xr-x =C2=A0 - jer= rypeng supergroup=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 0 2015-08-24 16:06 /hom= e/jerrypeng/hadoop/hadoop_dir

drwxr-xr-x =C2=A0 - jerrypeng supergroup=C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 0 2015-09-01 20:48 /home/jerrypeng/hadoop/hadoop_dir/data

-rw-r--r-- =C2=A0= 3 jerrypeng supergroup=C2=A0 =C2=A0 =C2=A0 18552 2015-09-01 19:18 /home/je= rrypeng/hadoop/hadoop_dir/data/data.txt

-rw-r--r-- =C2=A0 3 jerrypeng supergroup=C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 0 2015-09-01 20:48 /home/jerrypeng/hadoop/hadoop_d= ir/data/result.txt

drwxr-xr-x =C2=A0 - jerrypeng supergroup=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 0 2015-08-21 14:41 /home/jerrypeng/hadoop/hadoop_dir/dir1

drwxr-xr-x =C2=A0 - jerryp= eng supergroup=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 0 2015-08-24 15:59 /home/j= errypeng/hadoop/hadoop_dir/test

-rw-r--r-- =C2= =A0 3 jerrypeng supergroup =C2=A0 =C2=A0 =C2=A0 =C2=A0 32 2015-08-24 15:59 = /home/jerrypeng/hadoop/hadoop_dir/test/filename.txt


Any idea what's going on?


On Tue, Sep 1, 2015 = at 4:20 PM, Matthias J. Sax <mjsax@informatik.hu-berlin.de= > wrote:
You can use "bin/= flink cancel JOBID" or JobManager WebUI to cancel the
running job.

The exception you see, occurs in FlinkSubmitter.killTopology(...) which
is not used by "bin/flink cancel" or JobMaanger WebUI.

If you compile the example you yourself, just remove the call to
killTopology().

-Matthias

On 09/01/2015 11:16 PM, Matthias J. Sax wrote:
> Oh yes. I forgot about this. I have already a fix for it in a pending<= br> > pull request... I hope that this PR is merged soon...
>
> If you want to observe the progress, look here:
> https://issues.apache.org/jira/browse/FLINK-211= 1
> and
> https://issues.apache.org/jira/browse/FLINK-233= 8
>
> This PR, resolves both and fixed the problem you observed:
> https://github.com/apache/flink/pull/750
>
> -Matthias
>
>
> On 09/01/2015 11:09 PM, Jerry Peng wrote:
>> Hello,
>>
>> I corrected the number of slots for each task manager but now when= I try
>> to run the WordCount-StormTopology, the job manager daemon on my m= aster
>> node crashes and I get this exception in the log:
>>
>> java.lang.Exception: Received a message
>> CancelJob(6a4b9aa01ec87db20060210e5b36065e) without a leader sessi= on ID,
>> even though the message requires a leader session ID.
>>
>> at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$recei= ve$1.applyOrElse(LeaderSessionMessageFilter.scala:41)
>>
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(Abstra= ctPartialFunction.scala:33)
>>
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartia= lFunction.scala:33)
>>
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartia= lFunction.scala:25)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.= scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.= scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:1= 18)
>>
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessag= es.scala:28)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobMa= nager.scala:104)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java= :260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinP= ool.java:1339)
>>
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.j= ava:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerT= hread.java:107)
>>
>>
>> It seems to have something to do with canceling of the topology af= ter
>> the sleep.=C2=A0 Any ideas?
>>
>>
>> Best,
>>
>>
>> Jerry
>>
>>
>> On Tue, Sep 1, 2015 at 3:33 PM, Matthias J. Sax
>> <mjsax@informa= tik.hu-berlin.de <mailto:mjsax@informatik.hu-berlin.de>>
>> wrote:
>>
>>=C2=A0 =C2=A0 =C2=A0Yes. That is what I expected.
>>
>>=C2=A0 =C2=A0 =C2=A0JobManager cannot start the job, due to less ta= sk slots. It logs the
>>=C2=A0 =C2=A0 =C2=A0exception NoResourceAvailableException (it is n= ot shown in stdout; see
>>=C2=A0 =C2=A0 =C2=A0"log" folder). There is no feedback t= o Flink CLI that the job could not
>>=C2=A0 =C2=A0 =C2=A0be started.
>>
>>=C2=A0 =C2=A0 =C2=A0Furthermore, WordCount-StormTopology sleeps for= 5 seconds and tries to
>>=C2=A0 =C2=A0 =C2=A0"kill" the job. However, because the = job was never started, there is a
>>=C2=A0 =C2=A0 =C2=A0NotAliveException which in print to stdout.
>>
>>=C2=A0 =C2=A0 =C2=A0-Matthias
>>
>>
>>
>>=C2=A0 =C2=A0 =C2=A0On 09/01/2015 10:26 PM, Jerry Peng wrote:
>>=C2=A0 =C2=A0 =C2=A0> When I run WordCount-StormTopology I get t= he following exception:
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> ~/flink/bin/flink run WordCount-StormTopol= ogy.jar
>>=C2=A0 =C2=A0 =C2=A0> hdfs:///home/jerrypeng/hadoop/hadoop_dir/d= ata/data.txt
>>=C2=A0 =C2=A0 =C2=A0> hdfs:///home/jerrypeng/hadoop/hadoop_dir/d= ata/results.txt
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> org.apache.flink.client.program.ProgramInv= ocationException: The main
>>=C2=A0 =C2=A0 =C2=A0> method caused an error.
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0org.apache.flink.client.program.PackagedProgram= .callMainMethod(PackagedProgram.java:452)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0org.apache.flink.client.program.PackagedProgram= .invokeInteractiveModeForExecution(PackagedProgram.java:353)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at org.apache.flink.client.program.Client.= run(Client.java:278)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0org.apache.flink.client.CliFrontend.executeProg= ram(CliFrontend.java:631)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at org.apache.flink.client.CliFrontend.run= (CliFrontend.java:319)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0org.apache.flink.client.CliFrontend.parseParame= ters(CliFrontend.java:954)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at org.apache.flink.client.CliFrontend.mai= n(CliFrontend.java:1004)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> Caused by: NotAliveException(msg:null)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0org.apache.flink.stormcompatibility.api.FlinkCl= ient.killTopologyWithOpts(FlinkClient.java:209)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0org.apache.flink.stormcompatibility.api.FlinkCl= ient.killTopology(FlinkClient.java:203)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0org.apache.flink.stormcompatibility.wordcount.S= tormWordCountRemoteBySubmitter.main(StormWordCountRemoteBySubmitter.java:80= )
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at sun.reflect.NativeMethodAccessorImpl.in= voke0(Native Method)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0sun.reflect.NativeMethodAccessorImpl.invoke(Nat= iveMethodAccessorImpl.java:62)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0sun.reflect.DelegatingMethodAccessorImpl.invoke= (DelegatingMethodAccessorImpl.java:43)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at java.lang.reflect.Method.invoke(Method.= java:483)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> at
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0org.apache.flink.client.program.PackagedProgram= .callMainMethod(PackagedProgram.java:437)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> ... 6 more
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> The exception above occurred while trying = to run your command.
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> Any idea how to fix this?
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0> On Tue, Sep 1, 2015 at 3:10 PM, Matthias J= . Sax
>>=C2=A0 =C2=A0 =C2=A0> <mjsax@informatik.hu-berlin.de
>>=C2=A0 =C2=A0 =C2=A0<mailto:mjsax@informatik.hu-berlin.de>
>>=C2=A0 =C2=A0 =C2=A0<mailto:mjsax@informatik.hu-berlin.de
>>=C2=A0 =C2=A0 =C2=A0<mailto:mjsax@informatik.hu-berlin.de>>>
>>=C2=A0 =C2=A0 =C2=A0> wrote:
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0Hi Jerry,
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0WordCount-StormTopology= uses a hard coded dop of 4. If you
>>=C2=A0 =C2=A0 =C2=A0start up
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0Flink in local mode (bi= n/start-local-streaming.sh), you need
>>=C2=A0 =C2=A0 =C2=A0to increase
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0the number of task slot= s to at least 4 in conf/flink-conf.yaml
>>=C2=A0 =C2=A0 =C2=A0before
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0starting Flink -> ta= skmanager.numberOfTaskSlots
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0You should actually see= the following exception in
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0log/flink-...-jobmanage= r-...log
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> NoResourceAvailabl= eException: Not enough free slots available to
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0run the job. You can de= crease the operator parallelism or increase
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0the number of slots per= TaskManager in the configuration.
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0WordCount-StormTopology= does use StormWordCountRemoteBySubmitter
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0internally. So, you do = use it already ;)
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0I am not sure what you = mean by "get rid of KafkaSource"? It is
>>=C2=A0 =C2=A0 =C2=A0still in
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0the code base. Which ve= rsion to you use? In
>>=C2=A0 =C2=A0 =C2=A0flink-0.10-SNAPSHOT it is
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0located in submodule &q= uot;flink-connector-kafka" (which is
>>=C2=A0 =C2=A0 =C2=A0submodule of
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0"flink-streaming-c= onnector-parent" -- which is submodule of
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0"flink-streamping-= parent").
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0-Matthias
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0On 09/01/2015 09:40 PM,= Jerry Peng wrote:
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> Hello,
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> I have some questi= ons regarding how to run one of the
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> flink-storm-exampl= es, the WordCountTopology.=C2=A0 How should I
>>=C2=A0 =C2=A0 =C2=A0run the
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0job?
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> On github its says= I should just execute
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> bin/flink run exam= ple.jar but when I execute:
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> bin/flink run Word= Count-StormTopology.jar
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> nothing happens.= =C2=A0 What am I doing wrong? and How can I run the
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> WordCounttopology = via StormWordCountRemoteBySubmitter?
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> Also why did you g= uys get rid of the KafkaSource class?=C2=A0 What is
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0the API
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> now for subscribin= g to a kafka source?
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> Best,
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>=C2=A0 =C2=A0 =C2=A0> Jerry
>>=C2=A0 =C2=A0 =C2=A0>
>>=C2=A0 =C2=A0 =C2=A0>
>>
>>
>


--001a1134bce2a4dd5f051eb64b97--