incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-47: Switch FS.get(conf) references to path.getFileSystem so Crunch will run with S3FileSystem
Date Tue, 14 Aug 2012 23:13:31 GMT
Updated Branches:
  refs/heads/master 33f53e629 -> 04438473c


CRUNCH-47: Switch FS.get(conf) references to path.getFileSystem so Crunch will run with S3FileSystem


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/04438473
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/04438473
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/04438473

Branch: refs/heads/master
Commit: 04438473c7bd8681331aa180acf7e3c2c2d3faef
Parents: 33f53e6
Author: jwills <jwills@apache.org>
Authored: Tue Aug 14 15:43:57 2012 -0700
Committer: jwills <jwills@apache.org>
Committed: Tue Aug 14 15:43:57 2012 -0700

----------------------------------------------------------------------
 .../lib/jobcontrol/CrunchControlledJob.java        |   10 +++---
 .../org/apache/crunch/impl/mem/MemPipeline.java    |    2 +-
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |    4 +-
 .../org/apache/crunch/impl/mr/exec/CrunchJob.java  |   28 +++++++++++---
 .../org/apache/crunch/io/SourceTargetHelper.java   |    2 +-
 .../org/apache/crunch/io/avro/AvroFileSource.java  |    2 +-
 .../org/apache/crunch/io/seq/SeqFileSource.java    |    2 +-
 .../apache/crunch/io/seq/SeqFileTableSource.java   |    2 +-
 .../org/apache/crunch/io/text/TextFileSource.java  |    3 +-
 .../java/org/apache/crunch/util/DistCache.java     |    2 +-
 10 files changed, 36 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 205e8e3..396ea2d 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -316,12 +316,12 @@ public class CrunchControlledJob {
     try {
       Configuration conf = job.getConfiguration();
       if (conf.getBoolean(CREATE_DIR, false)) {
-        FileSystem fs = FileSystem.get(conf);
-        Path inputPaths[] = FileInputFormat.getInputPaths(job);
-        for (int i = 0; i < inputPaths.length; i++) {
-          if (!fs.exists(inputPaths[i])) {
+        Path[] inputPaths = FileInputFormat.getInputPaths(job);
+        for (Path inputPath : inputPaths) {
+          FileSystem fs = inputPath.getFileSystem(conf);
+          if (!fs.exists(inputPath)) {
             try {
-              fs.mkdirs(inputPaths[i]);
+              fs.mkdirs(inputPath);
             } catch (IOException e) {
 
             }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 767524c..77c41ce 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -147,7 +147,7 @@ public class MemPipeline implements Pipeline {
     if (target instanceof PathTarget) {
       Path path = ((PathTarget) target).getPath();
       try {
-        FileSystem fs = FileSystem.get(conf);
+        FileSystem fs = path.getFileSystem(conf);
         FSDataOutputStream os = fs.create(new Path(path, "out"));
         if (collection instanceof PTable) {
           for (Object o : collection.materialize()) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index ec03781..d9ee0c1 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -267,7 +267,7 @@ public class MRPipeline implements Pipeline {
   private static Path createTempDirectory(Configuration conf) {
     Path dir = createTemporaryPath(conf);
     try {
-      FileSystem.get(conf).mkdirs(dir);
+      dir.getFileSystem(conf).mkdirs(dir);
     } catch (IOException e) {
       throw new RuntimeException("Cannot create job output directory " + dir, e);
     }
@@ -293,7 +293,7 @@ public class MRPipeline implements Pipeline {
       return;
     }
     try {
-      FileSystem fs = FileSystem.get(conf);
+      FileSystem fs = tempDirectory.getFileSystem(conf);
       if (fs.exists(tempDirectory)) {
         fs.delete(tempDirectory, true);
       }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
index 8327f58..85fae63 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
@@ -52,22 +52,38 @@ public class CrunchJob extends CrunchControlledJob {
     if (!multiPaths.isEmpty()) {
       // Need to handle moving the data from the output directory of the
       // job to the output locations specified in the paths.
-      FileSystem fs = FileSystem.get(job.getConfiguration());
+      FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
       for (int i = 0; i < multiPaths.size(); i++) {
         Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
-        Path[] srcs = FileUtil.stat2Paths(fs.globStatus(src), src);
+        Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
         Path dst = multiPaths.get(i);
-        if (!fs.exists(dst)) {
-          fs.mkdirs(dst);
+        FileSystem dstFs = dst.getFileSystem(job.getConfiguration());
+        if (!dstFs.exists(dst)) {
+          dstFs.mkdirs(dst);
         }
-        int minPartIndex = getMinPartIndex(dst, fs);
+        boolean sameFs = isCompatible(srcFs, dst);
+        int minPartIndex = getMinPartIndex(dst, dstFs);
         for (Path s : srcs) {
-          fs.rename(s, getDestFile(s, dst, minPartIndex++));
+          Path d = getDestFile(s, dst, minPartIndex++);
+          if (sameFs) {
+            srcFs.rename(s, d);
+          } else {
+            FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration());
+          }
         }
       }
     }
   }
 
+  private boolean isCompatible(FileSystem fs, Path path) {
+    try {
+      fs.makeQualified(path);
+      return true;
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
+
   private Path getDestFile(Path src, Path dir, int index) {
     String form = "part-%s-%05d";
     if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
index f52bfe7..bbe5eaa 100644
--- a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
+++ b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
@@ -36,7 +36,7 @@ public class SourceTargetHelper {
   private static final Log LOG = LogFactory.getLog(SourceTargetHelper.class);
 
   public static long getPathSize(Configuration conf, Path path) throws IOException {
-    return getPathSize(FileSystem.get(conf), path);
+    return getPathSize(path.getFileSystem(conf), path);
   }
 
   public static long getPathSize(FileSystem fs, Path path) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 8bfb4dc..0ce4c06 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -47,7 +47,7 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements
ReadableSour
 
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    FileSystem fs = path.getFileSystem(conf);
     return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>)
ptype, conf));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
index 6f0ad05..e8f3dcf 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
@@ -36,7 +36,7 @@ public class SeqFileSource<T> extends FileSourceImpl<T> implements
ReadableSourc
 
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    FileSystem fs = path.getFileSystem(conf);
     return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype,
conf));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
index a846d66..56ed985 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
@@ -44,7 +44,7 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K,
V> implemen
 
   @Override
   public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    FileSystem fs = path.getFileSystem(conf);
     return CompositePathIterable.create(fs, path, new SeqFileTableReaderFactory<K, V>((PTableType<K,
V>) ptype, conf));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
index 6310995..ee51c04 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
@@ -26,7 +26,6 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.AvroUtf8InputFormat;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -68,7 +67,7 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements
ReadableSour
 
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    return CompositePathIterable.create(FileSystem.get(path.toUri(), conf), path, new TextFileReaderFactory<T>(ptype,
+    return CompositePathIterable.create(path.getFileSystem(conf), path, new TextFileReaderFactory<T>(ptype,
         conf));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/04438473/crunch/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/DistCache.java b/crunch/src/main/java/org/apache/crunch/util/DistCache.java
index 6ab2a50..20675d2 100644
--- a/crunch/src/main/java/org/apache/crunch/util/DistCache.java
+++ b/crunch/src/main/java/org/apache/crunch/util/DistCache.java
@@ -51,7 +51,7 @@ public class DistCache {
   private static final String TMPJARS_KEY = "tmpjars";
 
   public static void write(Configuration conf, Path path, Object value) throws IOException
{
-    ObjectOutputStream oos = new ObjectOutputStream(FileSystem.get(conf).create(path));
+    ObjectOutputStream oos = new ObjectOutputStream(path.getFileSystem(conf).create(path));
     oos.writeObject(value);
     oos.close();
 


Mime
View raw message