Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0F154179D4 for ; Wed, 7 Jan 2015 14:12:42 +0000 (UTC) Received: (qmail 75602 invoked by uid 500); 7 Jan 2015 14:12:43 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 75542 invoked by uid 500); 7 Jan 2015 14:12:43 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 75530 invoked by uid 99); 7 Jan 2015 14:12:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jan 2015 14:12:43 +0000 X-ASF-Spam-Status: No, hits=-1997.8 required=5.0 tests=ALL_TRUSTED,HTML_MESSAGE,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 07 Jan 2015 14:12:40 +0000 Received: (qmail 70701 invoked by uid 99); 7 Jan 2015 14:12:20 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jan 2015 14:12:20 +0000 Received: from mail-la0-f51.google.com (mail-la0-f51.google.com [209.85.215.51]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 0C5BA1A003F for ; Wed, 7 Jan 2015 14:12:15 +0000 (UTC) Received: by mail-la0-f51.google.com with SMTP id ms9so3607479lab.10 for ; Wed, 07 Jan 2015 06:12:03 -0800 (PST) X-Received: by 10.113.11.12 with SMTP id ee12mr4782080lbd.79.1420639923624; Wed, 07 Jan 2015 06:12:03 -0800 (PST) MIME-Version: 1.0 Received: by 10.153.5.36 with HTTP; Wed, 7 Jan 2015 06:11:43 -0800 (PST) In-Reply-To: <54AD3D23.6040206@gmail.com> References: <54AD11CF.6040807@gmail.com> <54AD2BB0.3050504@gmail.com> <54AD3D23.6040206@gmail.com> From: Robert Metzger Date: Wed, 7 Jan 2015 15:11:43 +0100 Message-ID: Subject: Re: How to add jars to flink To: user@flink.incubator.apache.org Content-Type: multipart/alternative; boundary=001a1133a5b40065ca050c10829a X-Virus-Checked: Checked by ClamAV on apache.org --001a1133a5b40065ca050c10829a Content-Type: text/plain; charset=UTF-8 Does one of the Hadoop configuration files contain an entry with the key "fs.xtreemfs.impl" ? On Wed, Jan 7, 2015 at 3:05 PM, Lukas Kairies wrote: > Unfortunately, it does not work with XtreemFS. I set "fs.hdfs.hadoopconf" > to the Hadoop configuration directory and tried to run the word count > example: > > bin/flink run -v examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar > xtreemfs:///test.txt xtreemfs:///result.txt > > The following error occurred: > > Error: The main method caused an error. > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:449) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:350) > at org.apache.flink.client.program.Client.run(Client.java:242) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:349) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:336) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:976) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1000) > Caused by: java.io.IOException: The given file URI > (xtreemfs:///result.txt) points to the HDFS NameNode at null, but the File > System could not be initialized with that address: port out of range:-1 > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:325) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:244) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:299) > at > org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:267) > at > org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.yarn.YarnJobManager$$anonfun$receiveYarnMessages$1.applyOrElse(YarnJobManager.scala:68) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: port out of range:-1 > at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143) > at java.net.InetSocketAddress.(InetSocketAddress.java:224) > at > org.xtreemfs.common.libxtreemfs.Helper.stringToInetSocketAddress(Helper.java:82) > at > org.xtreemfs.common.libxtreemfs.RPCCaller.getInetSocketAddressFromAddress(RPCCaller.java:301) > at > org.xtreemfs.common.libxtreemfs.RPCCaller.syncCall(RPCCaller.java:88) > at > org.xtreemfs.common.libxtreemfs.RPCCaller.syncCall(RPCCaller.java:49) > at > org.xtreemfs.common.libxtreemfs.ClientImplementation.listVolumeNames(ClientImplementation.java:529) > at > org.xtreemfs.common.clients.hadoop.XtreemFSFileSystem.initialize(XtreemFSFileSystem.java:164) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:311) > ... 31 more > > Am 07.01.2015 um 14:04 schrieb Stephan Ewen: > > Hi! > > You can reference a Hadoop configuration with a defaultFS entry via > "fs.hdfs.hadoopconf". > > Have a look at the configuration reference for details: > http://flink.incubator.apache.org/docs/0.7-incubating/config.html > > Let us know if it works for XtreemFS... > > Greetings, > Stephan > Am 07.01.2015 13:51 schrieb "Lukas Kairies" < > lukas.xtreemfs@googlemail.com>: > >> Thanks, now it works :) It is possible so set a default filesystem in >> flink like in Hadoop (with fs.default.name)? Currently I always have to >> set the complete file URI like xtreemfs://:/file >> >> Best, >> Lukas >> Am 07.01.2015 um 12:22 schrieb Robert Metzger: >> >> Hi Lukas, >> >> I see that there is a XtreemFS Hadoop client ( >> http://www.xtreemfs.org/download.php?t=source). WIth this pending pull >> request https://github.com/apache/flink/pull/268 you can use all file >> systems supported by hadoop with Flink (we support the >> org.apache.hadoop.FileSystems interface). >> >> The pull request has not been merged yet because of a failing test, but >> that should not affect you. >> If you want, you can check out the branch of my pull request >> >> git clone https://github.com/rmetzger/flink.git >> cd flink >> git checkout flink1266 >> mvn clean install -DskipTests >> >> In the "flink-dist/target/flink-XXX/flink-yarn-XXX/" directory is the >> finished built. >> >> Let me know if you need more help or information. >> >> Best, >> Robert >> >> >> On Wed, Jan 7, 2015 at 12:00 PM, Lukas Kairies < >> lukas.xtreemfs@googlemail.com> wrote: >> >>> Hello, >>> >>> I like to test flink on YARN with the alternative file system XtreemFS. >>> Therefore I have to add a jar file to flink but I found no possibility to >>> do so. How can I do this? Hadoop works fine with XtreemFS. >>> >>> Thanks >>> >>> Lukas >>> >> >> >> > --001a1133a5b40065ca050c10829a Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Does one of the Hadoop configuration files contain an entr= y with the key "fs.xtreemfs.impl" ?

On Wed, Jan 7, 2015 at 3:05 PM, Lukas Kai= ries <lukas.xtreemfs@googlemail.com> wrote:
<= blockquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px= #ccc solid;padding-left:1ex"> =20 =20 =20
=20 Unfortunately, it does not work with XtreemFS. I set "fs.hdfs.hadoopconf" to the Hadoop configuration directory an= d tried to run the word count example:

bin/flink run -v examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar xtreemfs:///test.txt xtreemfs:///result.txt

The following error occurred:

Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProg= ram.java:449)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExe= cution(PackagedProgram.java:350)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.Client.run(Client= .java:242)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:349= )
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.CliFrontend.run(CliFronte= nd.java:336)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:976) =C2=A0=C2=A0=C2=A0 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1000)
Caused by: java.io.IOException: The given file URI (xtreemfs:///result.txt) points to the HDFS NameNode at null, but the File System could not be initialized with that address: port out of range:-1
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSyst= em.java:325)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.core.fs.FileSystem.get(FileSyste= m.java:244)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.core.fs.Path.getFileSystem(Path.= java:299)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutput= Format.java:267)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(Out= putFormatVertex.java:84)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessa= ges$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessa= ges$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172)
=C2=A0=C2=A0=C2=A0 at scala.collection.Iterator$class.foreach(Iterator.= scala:727)
=C2=A0=C2=A0=C2=A0 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
=C2=A0=C2=A0=C2=A0 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
=C2=A0=C2=A0=C2=A0 at scala.collection.AbstractIterable.foreach(Iterabl= e.scala:54)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessa= ges$1.applyOrElse(JobManager.scala:172)
=C2=A0=C2=A0=C2=A0 at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartial= Function.scala:33)
=C2=A0=C2=A0=C2=A0 at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction= .scala:33)
=C2=A0=C2=A0=C2=A0 at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction= .scala:25)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.yarn.YarnJobManager$$anonfun$receiveYarnMessages$1.applyOr= Else(YarnJobManager.scala:68)
=C2=A0=C2=A0=C2=A0 at scala.PartialFunction$OrElse.apply(PartialFunctio= n.scala:162)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sc= ala:37)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sc= ala:27)
=C2=A0=C2=A0=C2=A0 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessa= ges.scala:27)
=C2=A0=C2=A0=C2=A0 at akka.actor.Actor$class.aroundReceive(Actor.scala:= 465)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.sca= la:52)
=C2=A0=C2=A0=C2=A0 at akka.actor.ActorCell.receiveMessage(ActorCell.sca= la:516)
=C2=A0=C2=A0=C2=A0 at akka.actor.ActorCell.invoke(ActorCell.scala:487)<= br> =C2=A0=C2=A0=C2=A0 at akka.dispatch.Mailbox.processMailbox(Mailbox.scal= a:254)
=C2=A0=C2=A0=C2=A0 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
=C2=A0=C2=A0=C2=A0 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) =C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:= 1339)
=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979= )
=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav= a:107)
Caused by: java.lang.IllegalArgumentException: port out of range:-1
=C2=A0=C2=A0=C2=A0 at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
=C2=A0=C2=A0=C2=A0 at java.net.InetSocketAddress.<init>(InetSocketAddress.java:224)
=C2=A0=C2=A0=C2=A0 at org.xtreemfs.common.libxtreemfs.Helper.stringToInetSocketAddress(Helper.jav= a:82)
=C2=A0=C2=A0=C2=A0 at org.xtreemfs.common.libxtreemfs.RPCCaller.getInetSocketAddressFromAddress(R= PCCaller.java:301)
=C2=A0=C2=A0=C2=A0 at org.xtreemfs.common.libxtreemfs.RPCCaller.syncCall(RPCCaller.java:88) =C2=A0=C2=A0=C2=A0 at org.xtreemfs.common.libxtreemfs.RPCCaller.syncCall(RPCCaller.java:49) =C2=A0=C2=A0=C2=A0 at org.xtreemfs.common.libxtreemfs.ClientImplementation.listVolumeNames(Client= Implementation.java:529)
=C2=A0=C2=A0=C2=A0 at org.xtreemfs.common.clients.hadoop.XtreemFSFileSystem.initialize(XtreemFSFi= leSystem.java:164)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSyst= em.java:311)
=C2=A0=C2=A0=C2=A0 ... 31 more

Am 07.01.2015 um 14:04 schrieb Stephan Ewen:

Hi!

You can reference a Hadoop configuration with a defaultFS entry via "fs.hdfs.hadoopconf".

Have a look at the configuration reference for details: http://flink.incubator.apache.org/doc= s/0.7-incubating/config.html

Let us know if it works for XtreemFS...

Greetings,
Stephan

Am 07.01.2015 13:51 schrieb "Lukas Kairies" <lukas.xtreemfs@googlemail.com>:
Thanks, now it works := ) It is possible so set a default filesystem in flink like in Hadoop (with fs.default.name)? Currently I always have to set the complete file URI like xtreemfs://<host>:<port>/file

Best,
Lukas
Am 07.01.2015 um 12:22 schrieb Robert Metzger:
Hi Lukas,

I see that there is a XtreemFS Hadoop client (http= ://www.xtreemfs.org/download.php?t=3Dsource). WIth this pending pull request https://github.com/apache/flin= k/pull/268 you can use all file systems supported by hadoop with Flink (we support the org.apache.hadoop.FileSystems interface).

The pull request has not been merged yet because of a failing test, but that should not affect you.
If you want, you can check out the branch of my pull request

git clone=C2=A0https://github.com/rmetzger/flink.git
cd flink
git checkout flink1266
mvn clean install -DskipTests

In the "flink-dist/target/flink-XXX/flink-yarn-XXX/" directory is the finished built.

Let me know if you need more help or information.

Best,
Robert


On Wed, Jan 7, 2015 at 12:00 PM, Lukas Kairies <lukas.xtreemfs@googlemail= .com> wrote:
Hello,

I like to test flink on YARN with the alternative file system XtreemFS. Therefore I have to add a jar file to flink but I found no possibility to do so. How can I do this? Hadoop works fine with XtreemFS.

Thanks

Lukas




--001a1133a5b40065ca050c10829a--