incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-99: Handle byte[] and ByteBuffer in Avros.writables
Date Wed, 24 Oct 2012 14:14:06 GMT
Updated Branches:
  refs/heads/master 0eb69c43b -> 69105d07b


CRUNCH-99: Handle byte[] and ByteBuffer in Avros.writables

Also includes some additional header/config cleanup, and ensure
unit test fails if ByteBuffer and byte array handling do not work
correctly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/69105d07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/69105d07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/69105d07

Branch: refs/heads/master
Commit: 69105d07b5c5b36d132a3f00e9ac191067db808d
Parents: 0eb69c4
Author: Josh Wills <jwills@apache.org>
Authored: Fri Oct 19 18:48:51 2012 -0700
Committer: Gabriel Reid <greid@apache.org>
Committed: Wed Oct 24 16:00:35 2012 +0200

----------------------------------------------------------------------
 .../crunch/io/avro/AvroFileSourceTargetIT.java     |    1 -
 .../org/apache/crunch/io/avro/AvroPipelineIT.java  |    1 -
 .../org/apache/crunch/io/avro/AvroWritableIT.java  |   89 +++++++++++++++
 .../org/apache/crunch/types/PGroupedTableType.java |   11 ++-
 .../java/org/apache/crunch/types/avro/Avros.java   |   11 +-
 5 files changed, 104 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
index 5deacd1..671b920 100644
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
@@ -41,7 +41,6 @@ import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.avro.Avros;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
index e0a7ead..29bf4f5 100644
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.avro.Schema;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java
new file mode 100644
index 0000000..cbb7fde
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.apache.crunch.types.avro.Avros.ints;
+import static org.apache.crunch.types.avro.Avros.tableOf;
+import static org.apache.crunch.types.avro.Avros.writables;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.io.DoubleWritable;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Verify handling of both a ByteBuffer and byte array as input from an Avro job (depending
+ * on the version of Avro being used).
+ */
+public class AvroWritableIT implements Serializable {
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  @Test
+  public void testAvroBasedWritablePipeline() throws Exception {
+    String customersInputPath = tmpDir.copyResourceFileName("customers.txt");
+    Pipeline pipeline = new MRPipeline(AvroWritableIT.class, tmpDir.getDefaultConfiguration());
+    pipeline.enableDebug();
+    PCollection<String> customerLines = pipeline.readTextFile(customersInputPath);
+    Map<Integer, DoubleWritable> outputMap = customerLines.parallelDo(
+        new MapFn<String, Pair<Integer, DoubleWritable>>() {
+          @Override
+          public Pair<Integer, DoubleWritable> map(String input) {
+            int len = input.length();
+            return Pair.of(len, new DoubleWritable(len));
+          }
+        }, tableOf(ints(), writables(DoubleWritable.class)))
+    .groupByKey()
+    .combineValues(new CombineFn<Integer, DoubleWritable>() {
+      @Override
+      public void process(Pair<Integer, Iterable<DoubleWritable>> input,
+          Emitter<Pair<Integer, DoubleWritable>> emitter) {
+        double sum = 0.0;
+        for (DoubleWritable dw : input.second()) {
+          sum += dw.get();
+        }
+        emitter.emit(Pair.of(input.first(), new DoubleWritable(sum)));
+      }
+    })
+    .materializeToMap();
+    
+    Map<Integer, DoubleWritable> expectedMap = Maps.newHashMap();
+    expectedMap.put(17, new DoubleWritable(17.0));
+    expectedMap.put(16, new DoubleWritable(16.0));
+    expectedMap.put(12, new DoubleWritable(24.0));
+   
+    assertEquals(expectedMap, outputMap);
+    
+    pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
index b4ac1e6..5718619 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -25,6 +25,7 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 
@@ -74,9 +75,15 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K,
Iterable<
     }
 
     @Override
+    public void configure(Configuration conf) {
+      keys.configure(conf);
+      values.configure(conf);
+    }
+    
+    @Override
     public void initialize() {
-      keys.initialize();
-      values.initialize();
+      keys.setContext(getContext());
+      values.setContext(getContext());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index 655ee55..c8a2ef5 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -234,7 +234,7 @@ public class Avros {
     return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz,
schema));
   }
 
-  private static class BytesToWritableMapFn<T extends Writable> extends MapFn<ByteBuffer,
T> {
+  private static class BytesToWritableMapFn<T extends Writable> extends MapFn<Object,
T> {
     private static final Log LOG = LogFactory.getLog(BytesToWritableMapFn.class);
 
     private final Class<T> writableClazz;
@@ -244,11 +244,12 @@ public class Avros {
     }
 
     @Override
-    public T map(ByteBuffer input) {
-      T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration());
+    public T map(Object input) {
+      ByteBuffer byteBuffer = BYTES_IN.map(input);
+      T instance = ReflectionUtils.newInstance(writableClazz, null);
       try {
-        instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input.arrayOffset(),
input
-            .limit())));
+        instance.readFields(new DataInputStream(new ByteArrayInputStream(byteBuffer.array(),
+            byteBuffer.arrayOffset(), byteBuffer.limit())));
       } catch (IOException e) {
         LOG.error("Exception thrown reading instance of: " + writableClazz, e);
       }


Mime
View raw message