flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: org.apache.flink.core.fs.Path error?
Date Thu, 20 Oct 2016 14:05:51 GMT
I believe i found the issue. The ContinuousFileMonitoringFunction never 
converts the given string to a Path, but directly generates a URI from it.

On 20.10.2016 15:48, Fabian Hueske wrote:
> The error message suggests that Flink tries to resolve "D:" as a file 
> system schema such as "file:" or "hdfs:".
>
> Can you try to use specify your path as "file:/D:/dir/myfile.csv"?
>
> Best, Fabian
>
> 2016-10-20 14:41 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com 
> <mailto:radu.tudoran@huawei.com>>:
>
>     Hi,
>
>     I know that Flink in general supports files also on windows. For
>     example I just tested successfully with relative file paths (e.g.
>     place the file in the local directory and give just the file name
>     then everything is working correctly). However with absolute paths
>     it does not work as per my previous explanation. Nevertheless,
>     please see also the error log below.
>
>     Exception in thread "main"
>     _org.apache.flink.runtime.client.JobExecutionException_: Job
>     execution failed.
>
>     at
>     org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(_JobManager.scala:822_)
>
>     at
>     org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(_JobManager.scala:768_)
>
>     at
>     org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(_JobManager.scala:768_)
>
>     at
>     scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(_Future.scala:24_)
>
>     at
>     scala.concurrent.impl.Future$PromiseCompletingRunnable.run(_Future.scala:24_)
>
>     at akka.dispatch.TaskInvocation.run(_AbstractDispatcher.scala:41_)
>
>     at
>     akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(_AbstractDispatcher.scala:401_)
>
>     at
>     scala.concurrent.forkjoin.ForkJoinTask.doExec(_ForkJoinTask.java:260_)
>
>     at
>     scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(_ForkJoinPool.java:1339_)
>
>     at
>     scala.concurrent.forkjoin.ForkJoinPool.runWorker(_ForkJoinPool.java:1979_)
>
>     at
>     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(_ForkJoinWorkerThread.java:107_)
>
>     Caused by: _java.io.IOException_: No file system found with scheme
>     D, referenced in file URI 'D:/dir/myfile.csv'.
>
>     at org.apache.flink.core.fs.FileSystem.get(_FileSystem.java:297_)
>
>     at
>     org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(_ContinuousFileMonitoringFunction.java:120_)
>
>     at
>     org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:80_)
>
>     at
>     org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:53_)
>
>     at
>     org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(_SourceStreamTask.java:56_)
>
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(_StreamTask.java:266_)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(_Task.java:584_)
>
>     at java.lang.Thread.run(_Thread.java:745_)
>
>     in addition to this there is some additional error if I dig
>     through the output logs
>
>     4:33:32,651 ERROR org.apache.hadoop.util.Shell - Failed to locate
>     the winutils binary in the hadoop binary path
>
>     _java.io.IOException_: Could not locate executable
>     null\bin\winutils.exe in the Hadoop binaries.
>
>     at org.apache.hadoop.util.Shell.getQualifiedBinPath(_Shell.java:318_)
>
>     at org.apache.hadoop.util.Shell.getWinUtilsPath(_Shell.java:333_)
>
>     at org.apache.hadoop.util.Shell.<clinit>(_Shell.java:326_)
>
>     at org.apache.hadoop.util.StringUtils.<clinit>(_StringUtils.java:76_)
>
>     at
>     org.apache.hadoop.security.Groups.parseStaticMapping(_Groups.java:92_)
>
>     at org.apache.hadoop.security.Groups.<init>(_Groups.java:76_)
>
>     at
>     org.apache.hadoop.security.Groups.getUserToGroupsMappingService(_Groups.java:239_)
>
>     at
>     org.apache.hadoop.security.UserGroupInformation.initialize(_UserGroupInformation.java:255_)
>
>     at
>     org.apache.hadoop.security.UserGroupInformation.ensureInitialized(_UserGroupInformation.java:232_)
>
>     at
>     org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(_UserGroupInformation.java:718_)
>
>     at
>     org.apache.hadoop.security.UserGroupInformation.getLoginUser(_UserGroupInformation.java:703_)
>
>     at
>     org.apache.hadoop.security.UserGroupInformation.getCurrentUser(_UserGroupInformation.java:605_)
>
>     at
>     org.apache.hadoop.fs.viewfs.ViewFileSystem.<init>(_ViewFileSystem.java:130_)
>
>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(_Native
>     Method_)
>
>     at
>     sun.reflect.NativeConstructorAccessorImpl.newInstance(_NativeConstructorAccessorImpl.java:57_)
>
>     at
>     sun.reflect.DelegatingConstructorAccessorImpl.newInstance(_DelegatingConstructorAccessorImpl.java:45_)
>
>     at java.lang.reflect.Constructor.newInstance(_Constructor.java:526_)
>
>     at java.lang.Class.newInstance(_Class.java:379_)
>
>     at java.util.ServiceLoader$LazyIterator.next(_ServiceLoader.java:373_)
>
>     at java.util.ServiceLoader$1.next(_ServiceLoader.java:445_)
>
>     at
>     org.apache.hadoop.fs.FileSystem.loadFileSystems(_FileSystem.java:2283_)
>
>     at
>     org.apache.hadoop.fs.FileSystem.getFileSystemClass(_FileSystem.java:2294_)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(_Native Method_)
>
>     at
>     sun.reflect.NativeMethodAccessorImpl.invoke(_NativeMethodAccessorImpl.java:57_)
>
>     at
>     sun.reflect.DelegatingMethodAccessorImpl.invoke(_DelegatingMethodAccessorImpl.java:43_)
>
>     at java.lang.reflect.Method.invoke(_Method.java:606_)
>
>     at
>     org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getDefaultHDFSClass(_HadoopFileSystem.java:91_)
>
>     at
>     org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.<init>(_HadoopFileSystem.java:75_)
>
>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(_Native
>     Method_)
>
>     at
>     sun.reflect.NativeConstructorAccessorImpl.newInstance(_NativeConstructorAccessorImpl.java:57_)
>
>     at
>     sun.reflect.DelegatingConstructorAccessorImpl.newInstance(_DelegatingConstructorAccessorImpl.java:45_)
>
>     at java.lang.reflect.Constructor.newInstance(_Constructor.java:526_)
>
>     at
>     org.apache.flink.core.fs.FileSystem.instantiateHadoopFileSystemWrapper(_FileSystem.java:334_)
>
>     at
>     org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(_FileSystem.java:358_)
>
>     at org.apache.flink.core.fs.FileSystem.get(_FileSystem.java:280_)
>
>     at
>     org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(_ContinuousFileMonitoringFunction.java:120_)
>
>     at
>     org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:80_)
>
>     at
>     org.apache.flink.streaming.api.operators.StreamSource.run(_StreamSource.java:53_)
>
>     at
>     org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(_SourceStreamTask.java:56_)
>
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(_StreamTask.java:266_)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(_Task.java:584_)
>
>     at java.lang.Thread.run(_Thread.java:745_)
>
>     *From:*Chesnay Schepler [mailto:chesnay@apache.org
>     <mailto:chesnay@apache.org>]
>     *Sent:* Thursday, October 20, 2016 2:22 PM
>     *To:* user@flink.apache.org <mailto:user@flink.apache.org>
>     *Subject:* Re: org.apache.flink.core.fs.Path error?
>
>     Hello Radu,
>
>     Flink can handle windows paths, this alone can't be the problem.
>     If you could post the error you are getting we may pinpoint the issue,
>     but right now i would suggest the usual: check that the path is
>     indeed correct, that you have sufficient permissions to access the
>     file.
>
>     And yes, you can report problems here ;)
>
>     Regards,
>     Chesnay
>
>     On 20.10.2016 13:17, Radu Tudoran wrote:
>
>         Hi,
>
>         I am running a program that is suppose to read a CSV file from
>         the local disk (I am still using Flink 1.1..i did not check if
>         the situation is the same for 1.2). I am currently running the
>         test on a windows OS.
>
>         I am creating the path to the file e.g. “D:\\dir\\myfile.csv”
>
>         However, I see that the CSV reader converts this to a Path
>         object from flink core
>
>         “val inputFormat = new TupleCsvInputFormat(new Path(path),
>         rowDelim, fieldDelim, typeInfo)” In CSVTableSource
>
>         This ends up representing the initial path as an URI and
>         changes \ to / resulting in ““D:/dir/myfile.csv””. The problem
>         is that this is never changed when the file is actually open
>         and accessed which leads to an error.
>
>         …not sure if signaling this error here is the best place or if
>         I should have used some other media..
>
>         Best regards,
>
>         Dr. Radu Tudoran
>
>         Senior Research Engineer - Big Data Expert
>
>         IT R&D Division
>
>         cid:image007.jpg@01CD52EB.AD060EE0
>
>         HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
>         European Research Center
>
>         Riesstrasse 25, 80992 München
>
>         E-mail: _radu.tudoran@huawei.com <mailto:radu.tudoran@huawei.com>_
>
>         Mobile: +49 15209084330 <tel:%2B49%2015209084330>
>
>         Telephone: +49 891588344173 <tel:%2B49%20891588344173>
>
>         HUAWEI TECHNOLOGIES Duesseldorf GmbH
>         Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>         <http://www.huawei.com/>
>         Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
>         56063,
>         Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>         Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
>         56063,
>         Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>
>         This e-mail and its attachments contain confidential
>         information from HUAWEI, which is intended only for the person
>         or entity whose address is listed above. Any use of the
>         information contained herein in any way (including, but not
>         limited to, total or partial disclosure, reproduction, or
>         dissemination) by persons other than the intended recipient(s)
>         is prohibited. If you receive this e-mail in error, please
>         notify the sender by phone or email immediately and delete it!
>
>


Mime
View raw message