crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-459: Allowing nulls as values in Avro PTableTypes
Date Fri, 08 Aug 2014 02:01:46 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 ea777acdf -> bdcff92e4


CRUNCH-459: Allowing nulls as values in Avro PTableTypes


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/bdcff92e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/bdcff92e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/bdcff92e

Branch: refs/heads/apache-crunch-0.8
Commit: bdcff92e4ddd0ad181d8cf6e3160cece51d36455
Parents: ea777ac
Author: Josh Wills <jwills@apache.org>
Authored: Thu Feb 6 12:29:36 2014 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Aug 7 18:53:06 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/EmitNullAvroIT.java  | 49 ++++++++++++++++++++
 .../apache/crunch/types/avro/AvroTableType.java | 14 +++++-
 2 files changed, 61 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/bdcff92e/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java b/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java
new file mode 100644
index 0000000..4353b90
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.avro.AvroFileTarget;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class EmitNullAvroIT extends CrunchTestSupport implements Serializable {
+  @Test
+  public void testNullableAvroPTable() throws Exception {
+    // This test fails if values are not nullable
+    final Pipeline p = new MRPipeline(EmitNullAvroIT.class, tempDir.getDefaultConfiguration());
+    final Path outDir = tempDir.getPath("out");
+    final PCollection<String> input = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")));
+
+    input.parallelDo(new MapFn<String, Pair<String, Person>>() {
+      @Override
+      public Pair<String, Person> map(final String input) {
+        return new Pair<String, Person>("first name", null);
+      }
+    }, Avros.tableOf(Avros.strings(), Avros.records(Person.class)))
+        .write(new AvroFileTarget(outDir), Target.WriteMode.APPEND);
+
+    p.done();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/bdcff92e/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index 00047cc..30db1ed 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.types.avro;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.crunch.MapFn;
@@ -68,7 +69,8 @@ class AvroTableType<K, V> extends BaseAvroTableType<K, V> implements
PTableType<
       keyMapFn.initialize();
       valueMapFn.initialize();
       pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
-          new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString();
+          new Schema.Parser().parse(firstJson),
+          nullable(new Schema.Parser().parse(secondJson))).toString();
     }
 
     @Override
@@ -122,7 +124,8 @@ class AvroTableType<K, V> extends BaseAvroTableType<K, V>
implements PTableType<
 
   public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K,
V>> pairClass) {
     super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(),
-        valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(),
+            nullable(valueType.getSchema())),
+        new IndexedRecordToPair(keyType.getInputMapFn(),
         valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType),
         new TupleDeepCopier(Pair.class, keyType, valueType), null, keyType, valueType);
     this.keyType = keyType;
@@ -148,4 +151,11 @@ class AvroTableType<K, V> extends BaseAvroTableType<K, V>
implements PTableType<
   public Pair<K, V> getDetachedValue(Pair<K, V> value) {
     return PTables.getDetachedValue(this, value);
   }
+
+  private static Schema nullable(Schema schema) {
+    if (schema.getType() == Schema.Type.NULL) {
+      return schema;
+    }
+    return Schema.createUnion(ImmutableList.of(schema, Schema.create(Schema.Type.NULL)));
+  }
 }


Mime
View raw message