crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-298: Slim down FormatBundle serialization
Date Wed, 20 Nov 2013 02:13:32 GMT
Updated Branches:
  refs/heads/master 2aa692e52 -> a724ddce5


CRUNCH-298: Slim down FormatBundle serialization


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a724ddce
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a724ddce
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a724ddce

Branch: refs/heads/master
Commit: a724ddce5686270782cad031b071f74084ccb534
Parents: 2aa692e
Author: Josh Wills <jwills@apache.org>
Authored: Tue Nov 19 17:44:21 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Nov 19 17:44:21 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/io/CrunchInputs.java    |  2 +-
 .../java/org/apache/crunch/io/CrunchOutputs.java   |  3 +--
 .../java/org/apache/crunch/io/FormatBundle.java    | 17 ++++++++---------
 3 files changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/a724ddce/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
index d154db2..c1a0eef 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
@@ -55,7 +55,7 @@ public class CrunchInputs {
     Configuration conf = job.getConfiguration();
     for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) {
       List<String> fields = Lists.newArrayList(SPLITTER.split(input));
-      FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0),
InputFormat.class);
+      FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0),
job.getConfiguration());
       if (!formatNodeMap.containsKey(inputBundle)) {
         formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
       }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a724ddce/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index ccf4fb5..cd1ebce 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -83,8 +83,7 @@ public class CrunchOutputs<K, V> {
     for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
       List<String> fields = Lists.newArrayList(SPLITTER.split(input));
       String name = fields.get(0);
-      FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1),
-          OutputFormat.class);
+      FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1),
conf);
       try {
         Class<?> keyClass = Class.forName(fields.get(2));
         Class<?> valueClass = Class.forName(fields.get(3));

http://git-wip-us.apache.org/repos/asf/crunch/blob/a724ddce/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
index aa84fee..3259aaf 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -20,7 +20,9 @@ package org.apache.crunch.io;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -52,17 +54,15 @@ public class FormatBundle<K> implements Serializable, Writable,
Configurable {
   private Map<String, String> extraConf;
   private Configuration conf;
   
-  public static <T> FormatBundle<T> fromSerialized(String serialized, Class<T>
clazz) {
+  public static <T> FormatBundle<T> fromSerialized(String serialized, Configuration
conf) {
     ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
     try {
-      ObjectInputStream ois = new ObjectInputStream(bais);
-      FormatBundle<T> bundle = (FormatBundle<T>) ois.readObject();
-      ois.close();
+      FormatBundle<T> bundle = new FormatBundle<T>();
+      bundle.setConf(conf);
+      bundle.readFields(new DataInputStream(bais));
       return bundle;
     } catch (IOException e) {
       throw new RuntimeException(e);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
     }
   }
 
@@ -102,9 +102,8 @@ public class FormatBundle<K> implements Serializable, Writable,
Configurable {
   public String serialize() {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try {
-      ObjectOutputStream oos = new ObjectOutputStream(baos);
-      oos.writeObject(this);
-      oos.close();
+      DataOutputStream dos = new DataOutputStream(baos);
+      write(dos);
       return Base64.encodeBase64String(baos.toByteArray());
     } catch (IOException e) {
       throw new RuntimeException(e);


Mime
View raw message