crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-543: Have AvroPathPerKeyTarget handle child directories properly
Date Fri, 17 Jul 2015 14:49:01 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 16209bed6 -> a670b9169


CRUNCH-543: Have AvroPathPerKeyTarget handle child directories properly


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a670b916
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a670b916
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a670b916

Branch: refs/heads/master
Commit: a670b91697bcb89faff252dd4204d8273b6dbf2d
Parents: 16209be
Author: Josh Wills <jwills@apache.org>
Authored: Thu Jul 16 16:07:38 2015 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Jul 16 16:07:38 2015 -0700

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroPathPerKeyIT.java | 39 ++++++++++++++++++++
 .../crunch/io/avro/AvroPathPerKeyTarget.java    | 29 +++++++++++----
 2 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/a670b916/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
index c1f7fa6..e674229 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
@@ -93,4 +93,43 @@ public class AvroPathPerKeyIT extends CrunchTestSupport implements Serializable
     assertFalse(fs.exists(outDir));
   }
 
+  @Test
+  public void testOutputFilePerKey_Directories() throws Exception {
+    Pipeline p = new MRPipeline(AvroPathPerKeyIT.class, tempDir.getDefaultConfiguration());
+    Path outDir = tempDir.getPath("out");
+    p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+            .parallelDo(new MapFn<String, Pair<String, String>>() {
+              @Override
+              public Pair<String, String> map(String input) {
+                String[] p = input.split("\t");
+                return Pair.of(p[0] + "/child", p[1]);
+              }
+            }, Avros.tableOf(Avros.strings(), Avros.strings()))
+            .groupByKey()
+            .write(new AvroPathPerKeyTarget(outDir));
+    p.done();
+
+    Set<String> names = Sets.newHashSet();
+    FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+    for (FileStatus fstat : fs.listStatus(outDir)) {
+      names.add(fstat.getPath().getName());
+    }
+    assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+    Path aParent = new Path(outDir, "A");
+    FileStatus[] aParentStat = fs.listStatus(aParent);
+    assertEquals(1, aParentStat.length);
+    assertEquals("child", aParentStat[0].getPath().getName());
+    FileStatus[] aChildStat = fs.listStatus(new Path(aParent, "child"));
+    assertEquals(1, aChildStat.length);
+    assertEquals("part-r-00000.avro", aChildStat[0].getPath().getName());
+
+    Path bParent = new Path(outDir, "B");
+    FileStatus[] bParentStat = fs.listStatus(bParent);
+    assertEquals(1, bParentStat.length);
+    assertEquals("child", bParentStat[0].getPath().getName());
+    FileStatus[] bChildStat = fs.listStatus(new Path(bParent, "child"));
+    assertEquals(1, bChildStat.length);
+    assertEquals("part-r-00000.avro", bChildStat[0].getPath().getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a670b916/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index 336b940..d17e5d7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -91,26 +91,41 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
       LOG.warn("Nothing to copy from {}", base);
       return;
     }
-    Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base));
     FileSystem dstFs = path.getFileSystem(conf);
+    Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base));
     if (!dstFs.exists(path)) {
       dstFs.mkdirs(path);
     }
     boolean sameFs = isCompatible(srcFs, path);
+    move(conf, base, srcFs, path, dstFs, sameFs);
+    dstFs.create(getSuccessIndicator(), true).close();
+  }
+
+  private void move(Configuration conf, Path srcBase, FileSystem srcFs, Path dstBase, FileSystem
dstFs, boolean sameFs)
+      throws IOException {
+    Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(srcBase));
+    if (!dstFs.exists(dstBase)) {
+      dstFs.mkdirs(dstBase);
+    }
     for (Path key : keys) {
       Path[] srcs = FileUtil.stat2Paths(srcFs.listStatus(key), key);
-      Path targetPath = new Path(path, key.getName());
+      Path targetPath = new Path(dstBase, key.getName());
       dstFs.mkdirs(targetPath);
       for (Path s : srcs) {
-        Path d = getDestFile(conf, s, targetPath, s.getName().contains("-m-"));
-        if (sameFs) {
-          srcFs.rename(s, d);
+        if (srcFs.isDirectory(s)) {
+          Path nextBase = new Path(targetPath, s.getName());
+          dstFs.mkdirs(nextBase);
+          move(conf, s, srcFs, nextBase, dstFs, sameFs);
         } else {
-          FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
+          Path d = getDestFile(conf, s, targetPath, s.getName().contains("-m-"));
+          if (sameFs) {
+            srcFs.rename(s, d);
+          } else {
+            FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
+          }
         }
       }
     }
-    dstFs.create(getSuccessIndicator(), true).close();
   }
 
   @Override


Mime
View raw message