crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject crunch git commit: CRUNCH-592: Job fails for null ByteBuffer value in Avro tables.
Date Wed, 10 Feb 2016 16:02:54 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 6463da421 -> b5b7f48eb


CRUNCH-592: Job fails for null ByteBuffer value in Avro tables.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b5b7f48e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b5b7f48e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b5b7f48e

Branch: refs/heads/master
Commit: b5b7f48ebf5a88b69084dcc08be76b8819fc00a7
Parents: 6463da4
Author: Tom White <tomwhite@apache.org>
Authored: Wed Feb 10 16:02:31 2016 +0000
Committer: Tom White <tomwhite@apache.org>
Committed: Wed Feb 10 16:02:31 2016 +0000

----------------------------------------------------------------------
 .../java/org/apache/crunch/EmitNullAvroIT.java   | 19 +++++++++++++++++++
 .../java/org/apache/crunch/types/avro/Avros.java |  9 +++++++++
 .../org/apache/crunch/types/avro/AvrosTest.java  |  2 ++
 3 files changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b5b7f48e/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java b/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java
index 4353b90..e387db5 100644
--- a/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java
@@ -19,6 +19,7 @@ package org.apache.crunch;
 
 import java.io.Serializable;
 
+import java.nio.ByteBuffer;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.From;
 import org.apache.crunch.io.avro.AvroFileTarget;
@@ -46,4 +47,22 @@ public class EmitNullAvroIT extends CrunchTestSupport implements Serializable
{
 
     p.done();
   }
+
+  @Test
+  public void testNullableAvroPTable_ByteBuffer() throws Exception {
+    final Pipeline p = new MRPipeline(EmitNullAvroIT.class, tempDir.getDefaultConfiguration());
+    final Path outDir = tempDir.getPath("out");
+    final PCollection<String> input = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")));
+
+    input.parallelDo(new MapFn<String, Pair<String, ByteBuffer>>() {
+      @Override
+      public Pair<String, ByteBuffer> map(final String input) {
+        return new Pair<String, ByteBuffer>("first name", null);
+      }
+    }, Avros.tableOf(Avros.strings(), Avros.bytes()))
+        .groupByKey()
+        .write(new AvroFileTarget(outDir), Target.WriteMode.APPEND);
+
+    p.done();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/b5b7f48e/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 989aa24..f9afe05 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -155,6 +155,9 @@ public class Avros {
   public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence,
String>() {
     @Override
     public String map(CharSequence input) {
+      if (input == null) {
+        return null;
+      }
       return input.toString();
     }
   };
@@ -162,6 +165,9 @@ public class Avros {
   public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>()
{
     @Override
     public Utf8 map(String input) {
+      if (input == null) {
+        return null;
+      }
       return new Utf8(input);
     }
   };
@@ -169,6 +175,9 @@ public class Avros {
   public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>()
{
     @Override
     public ByteBuffer map(Object input) {
+      if (input == null) {
+        return null;
+      }
       if (input instanceof ByteBuffer) {
         return (ByteBuffer) input;
       }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b5b7f48e/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
index 46c295e..c002012 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
@@ -68,6 +68,7 @@ public class AvrosTest {
     String s = "abc";
     Utf8 w = new Utf8(s);
     testInputOutputFn(Avros.strings(), s, w);
+    testInputOutputFn(Avros.strings(), null, null);
   }
 
   @Test
@@ -105,6 +106,7 @@ public class AvrosTest {
     byte[] bytes = new byte[] { 17, 26, -98 };
     ByteBuffer bb = ByteBuffer.wrap(bytes);
     testInputOutputFn(Avros.bytes(), bb, bb);
+    testInputOutputFn(Avros.bytes(), null, null);
   }
 
   @Test


Mime
View raw message