beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [44/50] [abbrv] incubator-beam git commit: Add initial microbenchmarks directory
Date Wed, 06 Jul 2016 17:20:50 GMT
Add initial microbenchmarks directory


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b36aeb92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b36aeb92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b36aeb92

Branch: refs/heads/runners-spark2
Commit: b36aeb92003b1c613c56633b2b5a0800260d92d3
Parents: a17a8b2
Author: bchambers <bchambers@google.com>
Authored: Tue Jun 28 15:55:16 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Wed Jul 6 10:18:53 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/DoFnReflector.java      |  12 +-
 sdks/java/microbenchmarks/README.md             |  42 ++++
 sdks/java/microbenchmarks/pom.xml               | 110 +++++++++
 .../coders/AvroCoderBenchmark.java              | 121 ++++++++++
 .../coders/ByteArrayCoderBenchmark.java         |  66 +++++
 .../coders/CoderBenchmarking.java               |  42 ++++
 .../coders/StringUtf8CoderBenchmark.java        |  72 ++++++
 .../transforms/DoFnReflectorBenchmark.java      | 239 +++++++++++++++++++
 sdks/java/pom.xml                               |   1 +
 9 files changed, 699 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b36aeb92/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index 452ee8e..e711d04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -123,7 +123,7 @@ public abstract class DoFnReflector {
    * @param c the {@link org.apache.beam.sdk.transforms.DoFnWithContext.ProcessContext}
    *     to pass to {@link ProcessElement}.
    */
-  abstract <InputT, OutputT> void invokeProcessElement(
+  public abstract <InputT, OutputT> void invokeProcessElement(
       DoFnWithContext<InputT, OutputT> fn,
       DoFnWithContext<InputT, OutputT>.ProcessContext c,
       ExtraContextFactory<InputT, OutputT> extra);
@@ -135,7 +135,7 @@ public abstract class DoFnReflector {
    * @param c the {@link org.apache.beam.sdk.transforms.DoFnWithContext.Context}
    *     to pass to {@link StartBundle}.
    */
-  <InputT, OutputT> void invokeStartBundle(
+  public <InputT, OutputT> void invokeStartBundle(
      DoFnWithContext<InputT, OutputT> fn,
      DoFnWithContext<InputT, OutputT>.Context c,
      ExtraContextFactory<InputT, OutputT> extra) {
@@ -149,7 +149,7 @@ public abstract class DoFnReflector {
    * @param c the {@link org.apache.beam.sdk.transforms.DoFnWithContext.Context}
    *     to pass to {@link FinishBundle}.
    */
-  abstract <InputT, OutputT> void invokeFinishBundle(
+  public abstract <InputT, OutputT> void invokeFinishBundle(
       DoFnWithContext<InputT, OutputT> fn,
       DoFnWithContext<InputT, OutputT>.Context c,
       ExtraContextFactory<InputT, OutputT> extra);
@@ -430,7 +430,7 @@ public abstract class DoFnReflector {
     }
 
     @Override
-    <InputT, OutputT> void invokeProcessElement(
+    public <InputT, OutputT> void invokeProcessElement(
         DoFnWithContext<InputT, OutputT> fn,
         DoFnWithContext<InputT, OutputT>.ProcessContext c,
         ExtraContextFactory<InputT, OutputT> extra) {
@@ -438,7 +438,7 @@ public abstract class DoFnReflector {
     }
 
     @Override
-    <InputT, OutputT> void invokeStartBundle(
+    public <InputT, OutputT> void invokeStartBundle(
         DoFnWithContext<InputT, OutputT> fn,
         DoFnWithContext<InputT, OutputT>.Context c,
         ExtraContextFactory<InputT, OutputT> extra) {
@@ -449,7 +449,7 @@ public abstract class DoFnReflector {
     }
 
     @Override
-    <InputT, OutputT> void invokeFinishBundle(
+    public <InputT, OutputT> void invokeFinishBundle(
         DoFnWithContext<InputT, OutputT> fn,
         DoFnWithContext<InputT, OutputT>.Context c,
         ExtraContextFactory<InputT, OutputT> extra) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b36aeb92/sdks/java/microbenchmarks/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/README.md b/sdks/java/microbenchmarks/README.md
new file mode 100644
index 0000000..627e669
--- /dev/null
+++ b/sdks/java/microbenchmarks/README.md
@@ -0,0 +1,42 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Microbenchmarks for parts of the Beam SDK
+
+To run benchmarks:
+
+ 1. Run `mvn install` in the top directory to install the SDK.
+
+ 2. Build the benchmark package:
+
+        cd microbenchmarks
+        mvn package
+
+ 3. run benchmark harness:
+
+        java -jar target/microbenchmarks.jar
+
+ 4. (alternate to step 3)
+    to run just a subset of benchmarks, pass a regular expression that
+    matches the benchmarks you want to run (this can match against the class
+    name, or the method name).  E.g., to run any benchmarks with
+    "DoFnReflector" in the name:
+
+        java -jar target/microbenchmarks.jar ".*DoFnReflector.*"
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b36aeb92/sdks/java/microbenchmarks/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/pom.xml b/sdks/java/microbenchmarks/pom.xml
new file mode 100644
index 0000000..96a76ec
--- /dev/null
+++ b/sdks/java/microbenchmarks/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-parent</artifactId>
+    <version>0.2.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-microbenchmarks</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Microbenchmarks</name>
+  <description>Microbenchmarks for components in the Beam Java SDK.</description>
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <finalName>microbenchmarks</finalName>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                  <mainClass>org.openjdk.jmh.Main</mainClass>
+                </transformer>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-core</artifactId>
+      <version>1.0.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-generator-annprocess</artifactId>
+      <version>1.6.1</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b36aeb92/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/AvroCoderBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/AvroCoderBenchmark.java
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/AvroCoderBenchmark.java
new file mode 100644
index 0000000..39b31ef
--- /dev/null
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/AvroCoderBenchmark.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.microbenchmarks.coders;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Benchmarks for {@link AvroCoder}.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Warmup(iterations = 5)
+public class AvroCoderBenchmark {
+
+  @DefaultCoder(AvroCoder.class)
+  private static class Pojo {
+    public String text;
+    public int count;
+
+    // Empty constructor required for Avro decoding.
+    @SuppressWarnings("unused")
+    public Pojo() {
+    }
+
+    public Pojo(String text, int count) {
+      this.text = text;
+      this.count = count;
+    }
+
+    // auto-generated
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      Pojo pojo = (Pojo) o;
+
+      if (count != pojo.count) {
+        return false;
+      }
+      if (text != null
+          ? !text.equals(pojo.text)
+          : pojo.text != null) {
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    @Override
+    public String toString() {
+      return "Pojo{"
+          + "text='" + text + '\''
+          + ", count=" + count
+          + '}';
+    }
+  }
+
+  AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+
+  @Param({"true", "false"})
+  boolean isWholeStream;
+
+  Pojo shortPojo;
+  Pojo longPojo;
+
+  @Setup
+  public void setUp() {
+    shortPojo = new Pojo("hello world", 42);
+
+    char[] bytes60k = new char[60 * 1024];
+    Arrays.fill(bytes60k, 'a');
+    longPojo = new Pojo(new String(bytes60k), 42);
+  }
+
+  @Benchmark
+  public Pojo codeShortPojo() throws IOException {
+    return CoderBenchmarking.testCoder(coder, isWholeStream, shortPojo);
+  }
+
+  @Benchmark
+  public Pojo codeLongPojo() throws Exception {
+    return CoderBenchmarking.testCoder(coder, isWholeStream, longPojo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b36aeb92/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/ByteArrayCoderBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/ByteArrayCoderBenchmark.java
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/ByteArrayCoderBenchmark.java
new file mode 100644
index 0000000..df20a15
--- /dev/null
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/ByteArrayCoderBenchmark.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.microbenchmarks.coders;
+
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Benchmarks for {@link ByteArrayCoder}.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Warmup(iterations = 5)
+public class ByteArrayCoderBenchmark {
+
+  ByteArrayCoder coder = ByteArrayCoder.of();
+
+  @Param({"true", "false"})
+  boolean isWholeStream;
+
+  byte[] shortArray;
+  byte[] longArray;
+
+  @Setup
+  public void setUp() {
+    shortArray = new byte[10];
+    Arrays.fill(shortArray, (byte) 47);
+    longArray = new byte[60 * 1024];
+    Arrays.fill(longArray, (byte) 47);
+  }
+
+  @Benchmark
+  public byte[] codeShortArray() throws IOException {
+    return CoderBenchmarking.testCoder(coder, isWholeStream, shortArray);
+  }
+
+  @Benchmark
+  public byte[] codeLongArray() throws Exception {
+    return CoderBenchmarking.testCoder(coder, isWholeStream, longArray);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b36aeb92/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/CoderBenchmarking.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/CoderBenchmarking.java
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/CoderBenchmarking.java
new file mode 100644
index 0000000..8523cb2
--- /dev/null
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/CoderBenchmarking.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.microbenchmarks.coders;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+
+import java.io.IOException;
+
+/**
+ * Utilities for writing coder benchmarks.
+ */
+class CoderBenchmarking {
+
+  /**
+   * Encodes and decodes the given value using the specified Coder.
+   *
+   * @throws IOException if there are errors during encoding or decoding
+   */
+  public static <T> T testCoder(
+      Coder<T> coder, boolean isWholeStream, T value) throws IOException {
+    Coder.Context context =
+        isWholeStream ? Coder.Context.OUTER : Coder.Context.NESTED;
+    byte[] encoded = CoderUtils.encodeToByteArray(coder, value, context);
+    return CoderUtils.decodeFromByteArray(coder, encoded, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b36aeb92/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/StringUtf8CoderBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/StringUtf8CoderBenchmark.java
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/StringUtf8CoderBenchmark.java
new file mode 100644
index 0000000..c0bcb45
--- /dev/null
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/coders/StringUtf8CoderBenchmark.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.microbenchmarks.coders;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Benchmarks for {@link StringUtf8Coder}.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Warmup(iterations = 5)
+public class StringUtf8CoderBenchmark {
+
+  StringUtf8Coder coder = StringUtf8Coder.of();
+
+  @Param({"true", "false"})
+  boolean isWholeStream;
+
+  String shortString;
+  String longString;
+
+  @Setup
+  public void setUp() {
+    shortString = "hello world";
+
+    char[] bytes60k = new char[60 * 1024];
+    Arrays.fill(bytes60k, 'a');
+    longString = new String(bytes60k);
+  }
+
+  @Benchmark
+  public String codeEmptyString() throws IOException {
+    return CoderBenchmarking.testCoder(coder, isWholeStream, "");
+  }
+
+  @Benchmark
+  public String codeShortString() throws IOException {
+    return CoderBenchmarking.testCoder(coder, isWholeStream, shortString);
+  }
+
+  @Benchmark
+  public String codeLongString() throws IOException {
+    return CoderBenchmarking.testCoder(coder, isWholeStream, longString);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b36aeb92/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
new file mode 100644
index 0000000..1b8ec2a
--- /dev/null
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.microbenchmarks.transforms;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnReflector;
+import org.apache.beam.sdk.transforms.DoFnWithContext;
+import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.joda.time.Instant;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Benchmarks for {@link DoFn} and {@link DoFnWithContext} invocations, specifically
+ * for measuring the overhead of {@link DoFnReflector}.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Warmup(iterations = 5)
+public class DoFnReflectorBenchmark {
+
+  private static final String ELEMENT = "some string to use for testing";
+
+  private DoFn<String, String> doFn = new UpperCaseDoFn();
+  private DoFnWithContext<String, String> doFnWithContext = new UpperCaseDoFnWithContext();
+
+  private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT);
+  private StubDoFnWithContextProcessContext stubDoFnWithContextContext =
+      new StubDoFnWithContextProcessContext(doFnWithContext, ELEMENT);
+  private ExtraContextFactory<String, String> extraContextFactory =
+      new ExtraContextFactory<String, String>() {
+
+    @Override
+    public BoundedWindow window() {
+      return null;
+    }
+
+    @Override
+    public WindowingInternals<String, String> windowingInternals() {
+      return null;
+    }
+  };
+
+  private DoFnReflector doFnReflector;
+  private DoFn<String, String> adaptedDoFnWithContext;
+
+  @Setup
+  public void setUp() {
+    doFnReflector = DoFnReflector.of(doFnWithContext.getClass());
+    adaptedDoFnWithContext = doFnReflector.toDoFn(doFnWithContext);
+  }
+
+  @Benchmark
+  public String invokeDoFn() throws Exception {
+    doFn.processElement(stubDoFnContext);
+    return stubDoFnContext.output;
+  }
+
+  @Benchmark
+  public String invokeDoFnWithContextViaAdaptor() throws Exception {
+    adaptedDoFnWithContext.processElement(stubDoFnContext);
+    return stubDoFnContext.output;
+  }
+
+  @Benchmark
+  public String invokeDoFnWithContext() throws Exception {
+    doFnReflector.invokeProcessElement(
+        doFnWithContext, stubDoFnWithContextContext, extraContextFactory);
+    return stubDoFnWithContextContext.output;
+  }
+
+  private static class UpperCaseDoFn extends DoFn<String, String> {
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element().toUpperCase());
+    }
+  }
+
+  private static class UpperCaseDoFnWithContext extends DoFnWithContext<String, String>
{
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element().toUpperCase());
+    }
+  }
+
+  private static class StubDoFnProcessContext extends DoFn<String, String>.ProcessContext
{
+
+    private final String element;
+    private String output;
+
+    public StubDoFnProcessContext(DoFn<String, String> fn, String element) {
+      fn.super();
+      this.element = element;
+    }
+
+    @Override
+    public String element() {
+      return element;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return null;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return null;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return null;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return null;
+    }
+
+    @Override
+    public WindowingInternals<String, String> windowingInternals() {
+      return null;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return null;
+    }
+
+    @Override
+    public void output(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public void outputWithTimestamp(String output, Instant timestamp) {
+      output(output);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner)
{
+      return null;
+    }
+  }
+
+  private static class StubDoFnWithContextProcessContext
+      extends DoFnWithContext<String, String>.ProcessContext {
+    private final String element;
+    private  String output;
+
+    public StubDoFnWithContextProcessContext(DoFnWithContext<String, String> fn, String
element) {
+      fn.super();
+      this.element = element;
+    }
+
+    @Override
+    public String element() {
+      return element;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return null;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return null;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return null;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return null;
+    }
+
+    @Override
+    public void output(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public void outputWithTimestamp(String output, Instant timestamp) {
+      output(output);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b36aeb92/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 55aea6a..0350804 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -41,6 +41,7 @@
          a released artifact exists, we need to modify the build order.
     <module>maven-archetypes</module> -->
     <module>extensions</module>
+    <module>microbenchmarks</module>
   </modules>
 
   <profiles>


Mime
View raw message