beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] beam git commit: [BEAM-1146] Decrease spark runner startup overhead
Date Mon, 02 Jan 2017 13:52:33 GMT
Repository: beam
Updated Branches:
  refs/heads/master e136f12c3 -> ee69825ea


[BEAM-1146] Decrease spark runner startup overhead


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0fe004d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0fe004d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0fe004d

Branch: refs/heads/master
Commit: d0fe004db98b4f1743939f357da4193bc3759f77
Parents: e136f12
Author: Aviem Zur <aviemzur@gmail.com>
Authored: Mon Jan 2 14:42:47 2017 +0200
Committer: Aviem Zur <aviemzur@gmail.com>
Committed: Mon Jan 2 14:52:37 2017 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  5 --
 .../coders/BeamSparkRunnerRegistrator.java      | 48 ++---------------
 .../coders/BeamSparkRunnerRegistratorTest.java  | 57 --------------------
 .../streaming/KafkaStreamingTest.java           | 57 +++++++++++++++++++-
 4 files changed, 59 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d0fe004d/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index dad5718..d9d45dd 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -247,11 +247,6 @@
       <artifactId>metrics-core</artifactId>
       <version>${dropwizard.metrics.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.reflections</groupId>
-      <artifactId>reflections</artifactId>
-      <version>0.9.10</version>
-    </dependency>
 
     <!-- KafkaIO -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/d0fe004d/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
index 93217b7..9d63ab0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -19,17 +19,8 @@
 package org.apache.beam.runners.spark.coders;
 
 import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Arrays;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.Source;
+import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.spark.serializer.KryoRegistrator;
-import org.reflections.Reflections;
 
 
 /**
@@ -39,40 +30,7 @@ public class BeamSparkRunnerRegistrator implements KryoRegistrator {
 
   @Override
   public void registerClasses(Kryo kryo) {
-    for (Class<?> clazz : ClassesForJavaSerialization.getClasses()) {
-      kryo.register(clazz, new StatelessJavaSerializer());
-    }
-  }
-
-  /**
-   * Register coders and sources with {@link JavaSerializer} since they aren't guaranteed
to be
-   * Kryo-serializable.
-   */
-  private static class ClassesForJavaSerialization {
-    private static final Class<?>[] CLASSES_FOR_JAVA_SERIALIZATION = new Class<?>[]{
-        Coder.class, Source.class
-    };
-
-    private static final Iterable<Class<?>> INSTANCE;
-
-    /**
-     * Find all subclasses of ${@link CLASSES_FOR_JAVA_SERIALIZATION}
-     */
-    static {
-      final Reflections reflections = new Reflections();
-      INSTANCE = Iterables.concat(Lists.transform(Arrays.asList(CLASSES_FOR_JAVA_SERIALIZATION),
-          new Function<Class, Set<Class<?>>>() {
-            @SuppressWarnings({"unchecked", "ConstantConditions"})
-            @Nullable
-            @Override
-            public Set<Class<?>> apply(@Nullable Class clazz) {
-              return reflections.getSubTypesOf(clazz);
-            }
-          }));
-    }
-
-    static Iterable<Class<?>> getClasses() {
-      return INSTANCE;
-    }
+    // MicrobatchSource is serialized as data and may not be Kryo-serializable.
+    kryo.register(MicrobatchSource.class, new StatelessJavaSerializer());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d0fe004d/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
deleted file mode 100644
index 0468cd0..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.coders;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.Source;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.reflections.Reflections;
-
-
-/**
- * BeamSparkRunnerRegistrator Test.
- */
-public class BeamSparkRunnerRegistratorTest {
-  @Test
-  public void testCodersAndSourcesRegistration() {
-    BeamSparkRunnerRegistrator registrator = new BeamSparkRunnerRegistrator();
-
-    Reflections reflections = new Reflections();
-    Iterable<Class<? extends Serializable>> classesForJavaSerialization =
-        Iterables.concat(reflections.getSubTypesOf(Coder.class),
-            reflections.getSubTypesOf(Source.class));
-
-    Kryo kryo = new Kryo();
-
-    registrator.registerClasses(kryo);
-
-    for (Class<?> clazz : classesForJavaSerialization) {
-      Assert.assertThat("Registered serializer for class " + clazz.getName()
-          + " was not an instance of " + StatelessJavaSerializer.class.getName(),
-          kryo.getSerializer(clazz),
-          Matchers.instanceOf(StatelessJavaSerializer.class));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d0fe004d/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 6be92d0..0853e9f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -17,8 +17,13 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamException;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -30,6 +35,9 @@ import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBat
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
 import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -52,6 +60,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+
 /**
  * Test Kafka as input.
  */
@@ -163,7 +172,7 @@ public class KafkaStreamingTest {
         .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
         .withTopics(Collections.singletonList(topic))
         .withKeyCoder(StringUtf8Coder.of())
-        .withValueCoder(StringUtf8Coder.of())
+        .withValueCoder(NonKryoSerializableStringCoder.of())
         .updateConsumerProperties(consumerProps);
 
     PCollection<String> formatted =
@@ -212,4 +221,50 @@ public class KafkaStreamingTest {
     }
   }
 
+  /**
+   * This coder is not Kryo serializable, used to make sure
+   * {@link org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator} registers needed
+   * classes to ensure Java serialization is used instead.
+   */
+  private static class NonKryoSerializableStringCoder extends CustomCoder<String>
+      implements Serializable {
+    private Coder<String> stringCoder;
+    private Boolean isSerialized = false;
+
+    private NonKryoSerializableStringCoder() {
+    }
+
+    @JsonCreator
+    public static NonKryoSerializableStringCoder of() {
+      return new NonKryoSerializableStringCoder();
+    }
+
+    private Object readResolve() throws ObjectStreamException {
+      NonKryoSerializableStringCoder deserialized = new NonKryoSerializableStringCoder();
+      deserialized.stringCoder = StringUtf8Coder.of();
+      deserialized.isSerialized = true;
+      return deserialized;
+    }
+
+    private Object writeReplace() throws ObjectStreamException {
+      return new NonKryoSerializableStringCoder();
+    }
+
+    @Override
+    public void encode(String value, OutputStream outStream, Context context)
+        throws CoderException, IOException {
+      if (!isSerialized) {
+        this.stringCoder = StringUtf8Coder.of();
+      }
+      stringCoder.encode(value, outStream, context);
+    }
+
+    @Override
+    public String decode(InputStream inStream, Context context) throws CoderException, IOException
{
+      if (!isSerialized) {
+        this.stringCoder = StringUtf8Coder.of();
+      }
+      return stringCoder.decode(inStream, context);
+    }
+  }
 }


Mime
View raw message