flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/7] flink git commit: [FLINK-2702] [examples] Add a distributed copy utility example
Date Fri, 18 Sep 2015 11:49:03 GMT
[FLINK-2702] [examples] Add a distributed copy utility example

This closes #1090


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9148b66
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9148b66
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9148b66

Branch: refs/heads/master
Commit: b9148b667d24d4d30a0fd877848c1178e4be647a
Parents: 0c5ebc8
Author: Vyacheslav Zholudev <vyacheslav.zholudev@researchgate.com>
Authored: Thu Sep 3 16:09:18 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Sep 18 11:49:51 2015 +0200

----------------------------------------------------------------------
 .../flink/examples/java/distcp/DistCp.java      | 178 +++++++++++++++++++
 .../examples/java/distcp/FileCopyTask.java      |  56 ++++++
 .../java/distcp/FileCopyTaskInputFormat.java    | 110 ++++++++++++
 .../java/distcp/FileCopyTaskInputSplit.java     |  42 +++++
 4 files changed, 386 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9148b66/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
new file mode 100644
index 0000000..3eae211
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A main class of the Flink distcp utility.
+ * It's a simple reimplementation of Hadoop distcp
+ * (see <a href="http://hadoop.apache.org/docs/r1.2.1/distcp.html">http://hadoop.apache.org/docs/r1.2.1/distcp.html</a>)
+ * with a dynamic input format
+ * Note that this tool does not deal with retriability. Additionally, empty directories are
not copied over.
+ * <p/>
+ * When running locally, local file systems paths can be used.
+ * However, in a distributed environment HDFS paths must be provided both as input and output.
+ */
+public class DistCp {
+	private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
+	public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
+	public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
+
+	public static void main(String[] args) throws Exception {
+		if (args.length != 3) {
+			printHelp();
+			System.exit(1);
+		}
+
+		final Path sourcePath = new Path(args[0]);
+		final Path targetPath = new Path(args[1]);
+		int parallelism = Integer.valueOf(args[2], 10);
+
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		checkInputParams(env, sourcePath, targetPath, parallelism);
+		env.setParallelism(parallelism);
+
+		long startTime = System.currentTimeMillis();
+		LOGGER.info("Initializing copy tasks");
+		List<FileCopyTask> tasks = getCopyTasks(sourcePath);
+		LOGGER.info("Copy task initialization took " + (System.currentTimeMillis() - startTime)
+ "ms");
+
+		DataSet<FileCopyTask> inputTasks = new DataSource<>(env,
+				new FileCopyTaskInputFormat(tasks),
+				new GenericTypeInfo<>(FileCopyTask.class), "fileCopyTasks");
+
+
+		FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask,
Object>() {
+			public LongCounter fileCounter;
+			public LongCounter bytesCounter;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
+				fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
+			}
+
+			@Override
+			public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
+				LOGGER.info("Processing task: " + task);
+				Path outPath = new Path(targetPath, task.getRelativePath());
+
+				FileSystem targetFs = targetPath.getFileSystem();
+				// creating parent folders in case of a local FS
+				if (!targetFs.isDistributedFS()) {
+					//dealing with cases like file:///tmp or just /tmp
+					File outFile = outPath.toUri().isAbsolute() ? new File(outPath.toUri()) : new File(outPath.toString());
+					File parentFile = outFile.getParentFile();
+					if (!parentFile.mkdirs() && !parentFile.exists()) {
+						throw new RuntimeException("Cannot create local file system directories: " + parentFile);
+					}
+				}
+				FSDataOutputStream outputStream = null;
+				FSDataInputStream inputStream = null;
+				try {
+					outputStream = targetFs.create(outPath, true);
+					inputStream = task.getPath().getFileSystem().open(task.getPath());
+					int bytes = IOUtils.copy(inputStream, outputStream);
+					bytesCounter.add(bytes);
+				} finally {
+					IOUtils.closeQuietly(inputStream);
+					IOUtils.closeQuietly(outputStream);
+				}
+				fileCounter.add(1l);
+			}
+		});
+
+		// no data sinks are needed, therefore just printing an empty result
+		res.print();
+
+		Map<String, Object> accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults();
+		LOGGER.info("== COUNTERS ==");
+		for (Map.Entry<String, Object> e : accumulators.entrySet()) {
+			LOGGER.info(e.getKey() + ": " + e.getValue());
+		}
+	}
+
+
+	// -----------------------------------------------------------------------------------------
+	// HELPER METHODS
+	// -----------------------------------------------------------------------------------------
+
+	private static void checkInputParams(ExecutionEnvironment env, Path sourcePath, Path targetPath,
int parallelism) throws IOException {
+		if (parallelism <= 0) {
+			throw new IllegalArgumentException("Parallelism should be greater than 0");
+		}
+
+		boolean isLocal = env instanceof LocalEnvironment;
+		if (!isLocal &&
+				!(sourcePath.getFileSystem().isDistributedFS() && targetPath.getFileSystem().isDistributedFS()))
{
+			throw new IllegalArgumentException("In a distributed mode only HDFS input/output paths
are supported");
+		}
+	}
+
+	private static void printHelp() {
+		System.err.println("Usage: <input_path> <output_path> <level_of_parallelism>");
+	}
+
+	private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException
{
+		List<FileCopyTask> tasks = new ArrayList<>();
+		getCopyTasks(sourcePath, "", tasks);
+		return tasks;
+	}
+
+	private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws
IOException {
+		FileStatus[] res = p.getFileSystem().listStatus(p);
+		if (res == null) {
+			return;
+		}
+		for (FileStatus fs : res) {
+			if (fs.isDir()) {
+				getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
+			} else {
+				Path cp = fs.getPath();
+				tasks.add(new FileCopyTask(cp, rel + cp.getName()));
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9148b66/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
new file mode 100644
index 0000000..3778775
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * A Java POJO that represents a task for copying a single file
+ */
+public class FileCopyTask implements Serializable {
+	private Path path;
+	private String relativePath;
+
+	public FileCopyTask(Path path, String relativePath) {
+		if (StringUtils.isEmpty(relativePath)) {
+			throw new IllegalArgumentException("Relative path should not be empty for: " + path);
+		}
+		this.path = path;
+		this.relativePath = relativePath;
+	}
+
+	public Path getPath() {
+		return path;
+	}
+
+	public String getRelativePath() {
+		return relativePath;
+	}
+
+	@Override
+	public String toString() {
+		return "FileCopyTask{" +
+				"path=" + path +
+				", relativePath='" + relativePath + '\'' +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9148b66/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
new file mode 100644
index 0000000..3ac872a
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * An implementation of an input format that dynamically assigns {@code FileCopyTask} to
the mappers
+ * that have finished previously assigned tasks
+ */
+public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit>
{
+	private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
+	private final List<FileCopyTask> tasks;
+
+	public FileCopyTaskInputFormat(List<FileCopyTask> tasks) {
+		this.tasks = tasks;
+	}
+
+	private class FileCopyTaskAssigner implements InputSplitAssigner {
+		private Queue<FileCopyTaskInputSplit> splits;
+
+		public FileCopyTaskAssigner(FileCopyTaskInputSplit[] inputSplits) {
+			splits = new LinkedList<>(Arrays.asList(inputSplits));
+		}
+
+		@Override
+		public InputSplit getNextInputSplit(String host, int taskId) {
+			LOGGER.info("Getting copy task for task: " + taskId);
+			return splits.poll();
+		}
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		//no op
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException
{
+		return null;
+	}
+
+	@Override
+	public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+		FileCopyTaskInputSplit[] splits = new FileCopyTaskInputSplit[tasks.size()];
+		int i = 0;
+		for (FileCopyTask t : tasks) {
+			splits[i++] = new FileCopyTaskInputSplit(t);
+		}
+		return splits;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(FileCopyTaskInputSplit[] inputSplits) {
+		return new FileCopyTaskAssigner(inputSplits);
+	}
+
+	private FileCopyTaskInputSplit curInputSplit = null;
+
+	@Override
+	public void open(FileCopyTaskInputSplit split) throws IOException {
+		curInputSplit = split;
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return curInputSplit == null;
+	}
+
+	@Override
+	public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException {
+		FileCopyTask toReturn = curInputSplit.getTask();
+		curInputSplit = null;
+		return toReturn;
+	}
+
+	@Override
+	public void close() throws IOException {
+		//no op
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9148b66/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
new file mode 100644
index 0000000..8ee5e09
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.flink.core.io.InputSplit;
+
+/**
+ * Implementation of {@code InputSplit} for copying files
+ */
+public class FileCopyTaskInputSplit implements InputSplit {
+	private int splitNo = 0;
+	private FileCopyTask task;
+
+	public FileCopyTaskInputSplit(FileCopyTask task) {
+		this.task = task;
+	}
+
+	public FileCopyTask getTask() {
+		return task;
+	}
+
+	@Override
+	public int getSplitNumber() {
+		return splitNo++;
+	}
+}


Mime
View raw message