crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-242: Control the input/output conversion via the Source and Target interfaces
Date Tue, 23 Jul 2013 22:14:31 GMT
Updated Branches:
  refs/heads/master ebacb54c6 -> 146f5080b


CRUNCH-242: Control the input/output conversion via the Source and Target interfaces


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/146f5080
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/146f5080
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/146f5080

Branch: refs/heads/master
Commit: 146f5080b5ff90657d3d4853d69179b4fd18ce0a
Parents: ebacb54
Author: Josh Wills <jwills@apache.org>
Authored: Mon Jul 22 17:12:11 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Jul 23 14:39:59 2013 -0700

----------------------------------------------------------------------
 .../apache/crunch/contrib/io/jdbc/DataBaseSource.java    |  5 +++++
 crunch-core/src/main/java/org/apache/crunch/Source.java  |  7 +++++++
 crunch-core/src/main/java/org/apache/crunch/Target.java  | 10 ++++++++++
 .../main/java/org/apache/crunch/impl/mr/plan/DoNode.java |  5 ++---
 .../org/apache/crunch/impl/mr/plan/JobPrototype.java     |  9 +++++----
 .../java/org/apache/crunch/io/impl/FileSourceImpl.java   |  6 ++++++
 .../java/org/apache/crunch/io/impl/FileTargetImpl.java   |  7 +++++++
 .../java/org/apache/crunch/io/impl/SourceTargetImpl.java | 11 +++++++++++
 .../apache/crunch/impl/mr/plan/JobNameBuilderTest.java   |  2 +-
 .../org/apache/crunch/io/hbase/HBaseSourceTarget.java    |  6 ++++++
 .../java/org/apache/crunch/io/hbase/HBaseTarget.java     |  6 ++++++
 11 files changed, 66 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
index 6ba5e06..83f509f 100644
--- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.sql.Driver;
 
 import org.apache.crunch.Source;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
@@ -123,4 +124,8 @@ public class DataBaseSource<T extends DBWritable & Writable>
implements Source<T
     return ptype;
   }
 
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return ptype.getConverter();
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
index b744c8f..b0a0449 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Source.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -19,6 +19,7 @@ package org.apache.crunch;
 
 import java.io.IOException;
 
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
@@ -35,6 +36,12 @@ public interface Source<T> {
   PType<T> getType();
 
   /**
+   * Returns the {@code Converter} used for mapping the inputs from this instance
+   * into {@code PCollection} or {@code PTable} values.
+   */
+  Converter<?, ?, ?, ?> getConverter();
+  
+  /**
    * Configure the given job to use this source as an input.
    * 
    * @param job

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 48dc2cd..65ad67d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -18,6 +18,7 @@
 package org.apache.crunch;
 
 import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 
@@ -80,6 +81,15 @@ public interface Target {
   boolean accept(OutputHandler handler, PType<?> ptype);
 
   /**
+   * Returns the {@code Converter} to use for mapping from the output {@code PCollection}
+   * into the output values expected by this instance.
+   * 
+   * @param ptype The {@code PType} of the data that is being written to this instance
+   * @return A valid {@code Converter} for the output represented by this instance
+   */
+  Converter<?, ?, ?, ?> getConverter(PType<?> ptype);
+  
+  /**
    * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target}
    * for the given {@code PType}, if possible. If it is not possible, return {@code null}.
    * 

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
index 2d6d590..87d0a5b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -63,8 +63,7 @@ public class DoNode {
     return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null);
   }
 
-  public static <S> DoNode createOutputNode(String name, PType<S> ptype) {
-    Converter outputConverter = ptype.getConverter();
+  public static DoNode createOutputNode(String name, Converter outputConverter, PType<?>
ptype) {
     DoFn<?, ?> fn = ptype.getOutputMapFn();
     return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null);
   }
@@ -135,7 +134,7 @@ public class DoNode {
     Converter inputConverter = null;
     if (inputNode) {
       if (nodeContext == NodeContext.MAP) {
-        inputConverter = ptype.getConverter();
+        inputConverter = source.getConverter();
       } else {
         inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter();
       }

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index da13611..c733323 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -36,6 +36,7 @@ import org.apache.crunch.impl.mr.run.CrunchMapper;
 import org.apache.crunch.impl.mr.run.CrunchReducer;
 import org.apache.crunch.impl.mr.run.NodeContext;
 import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.types.PType;
 import org.apache.crunch.util.DistCache;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -149,8 +150,8 @@ class JobPrototype {
       DoNode node = null;
       for (NodePath nodePath : targetsToNodePaths.get(target)) {
         if (node == null) {
-          PCollectionImpl<?> collect = nodePath.tail();
-          node = DoNode.createOutputNode(target.toString(), collect.getPType());
+          PType<?> ptype = nodePath.tail().getPType();
+          node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype);
           outputHandler.configureNode(node, target);
         }
         outputNodes.add(walkPath(nodePath.descendingIterator(), node));
@@ -163,8 +164,8 @@ class JobPrototype {
         DoNode node = null;
         for (NodePath nodePath : mapSideNodePaths.get(target)) {
           if (node == null) {
-            PCollectionImpl<?> collect = nodePath.tail();
-            node = DoNode.createOutputNode(target.toString(), collect.getPType());
+            PType<?> ptype = nodePath.tail().getPType();
+            node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype),
ptype);
             outputHandler.configureNode(node, target);
           }
           mapSideNodes.add(walkPath(nodePath.descendingIterator(), node));

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index b232abb..13645ba 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -31,6 +31,7 @@ import org.apache.crunch.io.CrunchInputs;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.SourceTargetHelper;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -84,6 +85,11 @@ public class FileSourceImpl<T> implements Source<T> {
   }
   
   @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return ptype.getConverter();
+  }
+  
+  @Override
   public void configureSource(Job job, int inputId) throws IOException {
     if (inputId == -1) {
       for (Path path : paths) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 07c63df..cbd87e3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -86,6 +86,12 @@ public class FileTargetImpl implements PathTarget {
     return true;
   }
 
+  @Override
+  public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
+    return ptype.getConverter();
+  }
+
+  @Override
   public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException
{
     FileSystem srcFs = workingPath.getFileSystem(conf);
     Path src = getSourcePattern(workingPath, index);
@@ -254,4 +260,5 @@ public class FileTargetImpl implements PathTarget {
     }
     return exists;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index 5dd4d69..68c9430 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -24,6 +24,7 @@ import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
@@ -94,4 +95,14 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
   public long getLastModifiedAt(Configuration configuration) {
     return source.getLastModifiedAt(configuration);
   }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return source.getConverter();
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
+    return target.getConverter(ptype);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
index 7963c83..0a30fa4 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
@@ -30,7 +30,7 @@ public class JobNameBuilderTest {
   public void testBuild() {
     final String pipelineName = "PipelineName";
     final String nodeName = "outputNode";
-    DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings());
+    DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings().getConverter(),
Writables.strings());
     JobNameBuilder jobNameBuilder = new JobNameBuilder(pipelineName);
     jobNameBuilder.visit(Lists.newArrayList(doNode));
     String jobName = jobNameBuilder.build();

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index 6a5a124..2f5a160 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -30,6 +30,7 @@ import org.apache.crunch.TableSource;
 import org.apache.crunch.impl.mr.run.CrunchMapper;
 import org.apache.crunch.io.CrunchInputs;
 import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
@@ -130,4 +131,9 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<
     LOG.warn("Cannot determine last modified time for source: " + toString());
     return -1;
   }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return PTYPE.getConverter();
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 83d62c8..69a260e 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -28,6 +28,7 @@ import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.MapReduceTarget;
 import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -120,4 +121,9 @@ public class HBaseTarget implements MapReduceTarget {
     LOG.info("HBaseTarget ignores checks for existing outputs...");
     return false;
   }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter(final PType<?> ptype) {
+    return ptype.getConverter();
+  }
 }


Mime
View raw message