beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ieme...@apache.org
Subject [2/3] beam git commit: Refactor HadoopInputFormatIO to use SerializableConfiguration from hadoop-common
Date Wed, 31 May 2017 07:18:29 GMT
Refactor HadoopInputFormatIO to use SerializableConfiguration from hadoop-common


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/636eaff0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/636eaff0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/636eaff0

Branch: refs/heads/master
Commit: 636eaff03646113daf868949734199f5697bdf0d
Parents: 2fa24d8
Author: Ismaël Mejía <iemejia@apache.org>
Authored: Tue May 2 01:33:27 2017 +0200
Committer: Ismaël Mejía <iemejia@apache.org>
Committed: Wed May 31 09:17:00 2017 +0200

----------------------------------------------------------------------
 .../hadoop/inputformat/HadoopInputFormatIO.java | 53 ++-----------
 .../inputformat/HadoopInputFormatIOTest.java    | 80 ++++++++++----------
 2 files changed, 47 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/636eaff0/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 336740c..efd47fd 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -23,11 +23,8 @@ import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AtomicDouble;
-import java.io.Externalizable;
 import java.io.IOException;
-import java.io.ObjectInput;
 import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.math.BigDecimal;
@@ -46,6 +43,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.io.hadoop.WritableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -432,7 +430,7 @@ public class HadoopInputFormatIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      Configuration hadoopConfig = getConfiguration().getHadoopConfiguration();
+      Configuration hadoopConfig = getConfiguration().get();
       if (hadoopConfig != null) {
         builder.addIfNotNull(DisplayData.item("mapreduce.job.inputformat.class",
             hadoopConfig.get("mapreduce.job.inputformat.class"))
@@ -493,7 +491,7 @@ public class HadoopInputFormatIO {
       }
       createInputFormatInstance();
       List<InputSplit> splits =
-          inputFormatObj.getSplits(Job.getInstance(conf.getHadoopConfiguration()));
+          inputFormatObj.getSplits(Job.getInstance(conf.get()));
       if (splits == null) {
         throw new IOException("Error in computing splits, getSplits() returns null.");
       }
@@ -520,12 +518,12 @@ public class HadoopInputFormatIO {
       if (inputFormatObj == null) {
         try {
           taskAttemptContext =
-              new TaskAttemptContextImpl(conf.getHadoopConfiguration(), new TaskAttemptID());
+              new TaskAttemptContextImpl(conf.get(), new TaskAttemptID());
           inputFormatObj =
               (InputFormat<?, ?>) conf
-                  .getHadoopConfiguration()
+                  .get()
                   .getClassByName(
-                      conf.getHadoopConfiguration().get("mapreduce.job.inputformat.class"))
+                      conf.get().get("mapreduce.job.inputformat.class"))
                   .newInstance();
           /*
            * If InputFormat explicitly implements interface {@link Configurable}, then setConf()
@@ -535,7 +533,7 @@ public class HadoopInputFormatIO {
            * org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, etc.
            */
           if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) {
-            ((Configurable) inputFormatObj).setConf(conf.getHadoopConfiguration());
+            ((Configurable) inputFormatObj).setConf(conf.get());
           }
         } catch (InstantiationException | IllegalAccessException | ClassNotFoundException
e) {
           throw new IOException("Unable to create InputFormat object: ", e);
@@ -802,41 +800,4 @@ public class HadoopInputFormatIO {
       new ObjectWritable(inputSplit).write(out);
     }
   }
-
-  /**
-   * A wrapper to allow Hadoop {@link org.apache.hadoop.conf.Configuration} to be serialized
using
-   * Java's standard serialization mechanisms. Note that the org.apache.hadoop.conf.Configuration
-   * is Writable.
-   */
-  public static class SerializableConfiguration implements Externalizable {
-
-    private Configuration conf;
-
-    public SerializableConfiguration() {}
-
-    public SerializableConfiguration(Configuration conf) {
-      this.conf = conf;
-    }
-
-    public Configuration getHadoopConfiguration() {
-      return conf;
-    }
-
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-      out.writeUTF(conf.getClass().getCanonicalName());
-      ((Writable) conf).write(out);
-    }
-
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-      String className = in.readUTF();
-      try {
-        conf = (Configuration) Class.forName(className).newInstance();
-        conf.readFields(in);
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException("Unable to create configuration: " + e);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/636eaff0/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index aeeeb17..9ec3838 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -26,11 +26,11 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.io.hadoop.WritableCoder;
 import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.EmployeeRecordReader;
 import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.NewObjectsEmployeeInputSplit;
 import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.HadoopInputFormatBoundedSource;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableConfiguration;
 import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableSplit;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
@@ -94,11 +94,11 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadBuildsCorrectly() {
     HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String,
String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslate)
         .withValueTranslation(myValueTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(myValueTranslate, read.getValueTranslationFunction());
     assertEquals(myValueTranslate.getOutputTypeDescriptor(), read.getValueTypeDescriptor());
@@ -116,10 +116,10 @@ public class HadoopInputFormatIOTest {
     HadoopInputFormatIO.Read<String, String> read =
         HadoopInputFormatIO.<String, String>read()
             .withValueTranslation(myValueTranslate)
-            .withConfiguration(serConf.getHadoopConfiguration())
+            .withConfiguration(serConf.get())
             .withKeyTranslation(myKeyTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(myValueTranslate, read.getValueTranslationFunction());
     assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
@@ -142,15 +142,15 @@ public class HadoopInputFormatIOTest {
             Employee.class,
             Text.class);
     HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String,
String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslate)
-        .withConfiguration(diffConf.getHadoopConfiguration());
-    assertEquals(diffConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+        .withConfiguration(diffConf.get());
+    assertEquals(diffConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(null, read.getValueTranslationFunction());
     assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
-    assertEquals(diffConf.getHadoopConfiguration().getClass("value.class", Object.class),
read
+    assertEquals(diffConf.get().getClass("value.class", Object.class), read
         .getValueTypeDescriptor().getRawType());
   }
 
@@ -173,14 +173,14 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadObjectCreationWithConfiguration() {
     HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration());
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+        .withConfiguration(serConf.get());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(null, read.getKeyTranslationFunction());
     assertEquals(null, read.getValueTranslationFunction());
-    assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class), read
+    assertEquals(serConf.get().getClass("key.class", Object.class), read
         .getKeyTypeDescriptor().getRawType());
-    assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class),
read
+    assertEquals(serConf.get().getClass("value.class", Object.class), read
         .getValueTypeDescriptor().getRawType());
   }
 
@@ -194,7 +194,7 @@ public class HadoopInputFormatIOTest {
   public void testReadObjectCreationFailsIfKeyTranslationFunctionIsNull() {
     thrown.expect(NullPointerException.class);
     HadoopInputFormatIO.<String, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(null);
   }
 
@@ -205,15 +205,15 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadObjectCreationWithConfigurationKeyTranslation() {
     HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String,
Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(null, read.getValueTranslationFunction());
     assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
         read.getKeyTypeDescriptor().getRawType());
-    assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class),
+    assertEquals(serConf.get().getClass("value.class", Object.class),
         read.getValueTypeDescriptor().getRawType());
   }
 
@@ -227,7 +227,7 @@ public class HadoopInputFormatIOTest {
   public void testReadObjectCreationFailsIfValueTranslationFunctionIsNull() {
     thrown.expect(NullPointerException.class);
     HadoopInputFormatIO.<Text, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withValueTranslation(null);
   }
 
@@ -238,13 +238,13 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadObjectCreationWithConfigurationValueTranslation() {
     HadoopInputFormatIO.Read<Text, String> read = HadoopInputFormatIO.<Text, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withValueTranslation(myValueTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(null, read.getKeyTranslationFunction());
     assertEquals(myValueTranslate, read.getValueTranslationFunction());
-    assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class),
+    assertEquals(serConf.get().getClass("key.class", Object.class),
         read.getKeyTypeDescriptor().getRawType());
     assertEquals(myValueTranslate.getOutputTypeDescriptor().getRawType(),
         read.getValueTypeDescriptor().getRawType());
@@ -257,11 +257,11 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadObjectCreationWithConfigurationKeyTranslationValueTranslation() {
     HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String,
String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslate)
         .withValueTranslation(myValueTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(myValueTranslate, read.getValueTranslationFunction());
     assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
@@ -342,13 +342,13 @@ public class HadoopInputFormatIOTest {
           }
         };
     HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String,
Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslateWithWrongInputType);
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(String.format(
         "Key translation's input type is not same as hadoop InputFormat : %s key " + "class
: %s",
-        serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
-            InputFormat.class), serConf.getHadoopConfiguration()
+        serConf.get().getClass("mapreduce.job.inputformat.class",
+            InputFormat.class), serConf.get()
             .getClass("key.class", Object.class)));
     read.validateTransform();
   }
@@ -370,15 +370,15 @@ public class HadoopInputFormatIOTest {
         };
     HadoopInputFormatIO.Read<Text, String> read =
         HadoopInputFormatIO.<Text, String>read()
-            .withConfiguration(serConf.getHadoopConfiguration())
+            .withConfiguration(serConf.get())
             .withValueTranslation(myValueTranslateWithWrongInputType);
     String expectedMessage =
         String.format(
             "Value translation's input type is not same as hadoop InputFormat :  "
                 + "%s value class : %s",
-            serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
+            serConf.get().getClass("mapreduce.job.inputformat.class",
                 InputFormat.class),
-            serConf.getHadoopConfiguration().getClass("value.class", Object.class));
+            serConf.get().getClass("value.class", Object.class));
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(expectedMessage);
     read.validateTransform();
@@ -387,7 +387,7 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadingData() throws Exception {
     HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration());
+        .withConfiguration(serConf.get());
     List<KV<Text, Employee>> expected = TestEmployeeDataSet.getEmployeeData();
     PCollection<KV<Text, Employee>> actual = p.apply("ReadTest", read);
     PAssert.that(actual).containsInAnyOrder(expected);
@@ -413,11 +413,11 @@ public class HadoopInputFormatIOTest {
     assertThat(
         displayData,
         hasDisplayItem("mapreduce.job.inputformat.class",
-            serConf.getHadoopConfiguration().get("mapreduce.job.inputformat.class")));
+            serConf.get().get("mapreduce.job.inputformat.class")));
     assertThat(displayData,
-        hasDisplayItem("key.class", serConf.getHadoopConfiguration().get("key.class")));
+        hasDisplayItem("key.class", serConf.get().get("key.class")));
     assertThat(displayData,
-        hasDisplayItem("value.class", serConf.getHadoopConfiguration().get("value.class")));
+        hasDisplayItem("value.class", serConf.get().get("value.class")));
   }
 
   /**


Mime
View raw message