Ok. What I still not able to do is to recursively remove empty dirs from the source dir because there's no API for getChildrenCount() or getChildren() for a given Path.
How can I do that?

On Tue, Jul 21, 2015 at 3:13 PM, Stephan Ewen <sewen@apache.org> wrote:
I don't think there is a simpler way to do this.

Flink follows the semantics of the Hadoop's HDFS file system there, which behaves that way, and the Java File class.

But it seems your solution is working, even if it needs a few extra lines of code.

On Fri, Jul 17, 2015 at 11:17 AM, Flavio Pompermaier <pompermaier@okkam.it> wrote:
Of course I move the folder before the job starts or ends :)
My job does some transformation on the row data and put the results in another folder.
The next time the job is executed checks whether the output folder exists and, if so, it moves such folder to an archive dir.
I wanted to use the Flink client because is FS independent, so I can choose which FS to use at runtime.
At the moment what I do is:

Path dataSourceArchivePath = new Path(rowChunksArchiveBaseDir, dataSourceId);
dataSourceArchivePath.getFileSystem().mkdirs(dataSourceArchivePath.getParent());
boolean moved = dataSourceArchivePath.getFileSystem().rename(dataSourceDirPath, dataSourceArchivePath.getParent());
LOG.info("Archiving {} to {} {}", dataSourceDirPath,dataSourceArchivePath, moved ? "successful" : "failed");

Moreover I still have to delete the empty subPaths of the dataSourceArchivePath after the move but I can't do that because there's no listChildren() on the Path object :(
I was looking for a simpler way to do this. Does it exists?

On Fri, Jul 17, 2015 at 10:08 AM, <fhueske@gmail.com> wrote:
Do you want to move the folder within a running job? This might cause a lot of problems, because you cannot (easily) control when a move command would be executed.

Wouldn’t it be a better idea to do that after a job is finished and use the regular HDFS client?

From: Flavio Pompermaier
Sent: ‎Friday‎, ‎17‎. ‎July‎, ‎2015 ‎10‎:‎02
To: user@flink.apache.org

Hi to all,

in my Flink job I wanted to move a folder (containing other folders and files) to another location.
For example, I wanted to move folder A to folder Y, where my HDFS looks like:

myRootDir/X/a/aa/aaa/someFile1
myRootDir/X/b/bb/bbb/someFile2
myRootDir/Y

I tried to use rename but it silently fails (rename just returns false) if the parent directory doesn't exists.
Is there an easy way to do that with the Flink FS apis?
If the rename() is intended to work that way, couldn't be useful a move() API..?

Best,
Flavio