beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] incubator-beam git commit: Create runners/core module for artifact org.apache.beam:runners-core
Date Wed, 04 May 2016 17:28:01 GMT
Create runners/core module for artifact org.apache.beam:runners-core

This is strictly creating the module and moving one easy class to it.
Many of the utilities in org.apache.beam.util and subpackages should
move as developments allow.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/892ead2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/892ead2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/892ead2c

Branch: refs/heads/master
Commit: 892ead2c3b8bf7a53ee2d7570ba587453c186009
Parents: 6819dff
Author: Kenneth Knowles <klk@google.com>
Authored: Tue May 3 13:22:59 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed May 4 09:44:05 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |   6 +
 runners/core-java/pom.xml                       | 208 +++++++++++++++++++
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 112 ++++++++++
 runners/core-java/src/test/java/.placeholder    |   0
 runners/direct-java/pom.xml                     |  21 ++
 .../direct/GroupByKeyEvaluatorFactory.java      |   2 +-
 runners/flink/runner/pom.xml                    |  11 +
 .../FlinkGroupAlsoByWindowWrapper.java          |   8 +-
 runners/pom.xml                                 |   1 +
 .../util/GroupAlsoByWindowViaWindowSetDoFn.java | 105 ----------
 10 files changed, 364 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b0b258b..27787da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>runners-core</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>google-cloud-dataflow-java-runner</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
new file mode 100644
index 0000000..b6f6f29
--- /dev/null
+++ b/runners/core-java/pom.xml
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>runners-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>runners-core</artifactId>
+  <name>Apache Beam :: Runners :: Core</name>
+  <description>Beam Runners Core provides utilities to aid runner authors.</description>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals><goal>analyze-only</goal></goals>
+            <configuration>
+              <failOnWarning>true</failOnWarning>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+
+      <!-- Source plugin for generating source and test-source JARs. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <configuration>
+          <windowtitle>Beam Runners Core utilities ${project.version} API</windowtitle>
+          <doctitle>Beam Runners Core utilities for Java, version ${project.version}</doctitle>
+          <overview>../javadoc/overview.html</overview>
+
+          <subpackages>org.apache.beam.runners.core</subpackages>
+          <use>false</use>
+          <quiet>true</quiet>
+          <bottom><![CDATA[<br>]]></bottom>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.4.1</version>
+        <executions>
+          <!-- In the first phase, we pick dependencies and relocate them. -->
+          <execution>
+            <id>bundle-and-repackage</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              <artifactSet>
+                <includes>
+                  <include>com.google.guava:guava</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <relocations>
+                <!-- TODO: Once ready, change the following pattern to 'com'
+                     only, exclude 'org.apache.beam.**', and remove
+                     the second relocation. -->
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  <shadedPattern>org.apache.beam.sdk.repackaged.com.google.common</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.thirdparty</pattern>
+                  <shadedPattern>org.apache.beam.sdk.repackaged.com.google.thirdparty</shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+
+          <!-- In the second phase, we pick remaining dependencies and bundle
+               them without repackaging. -->
+          <execution>
+            <id>bundle-rest-without-repackaging</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
+              <artifactSet>
+                <excludes>
+                  <exclude>com.google.guava:guava</exclude>
+                </excludes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+    </dependency>
+
+    <!-- build dependencies -->
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
new file mode 100644
index 0000000..73244f7
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.ReduceFnRunner;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * {@link ReduceFnRunner}.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowViaWindowSetDoFn<
+        K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
+    extends DoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT,
OutputT, W> {
+
+  public static <K, InputT, OutputT, W extends BoundedWindow>
+      DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
+          WindowingStrategy<?, W> strategy, SystemReduceFn<K, InputT, ?, OutputT,
W> reduceFn) {
+    return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, reduceFn);
+  }
+
+  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+      createAggregator(
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+  protected final Aggregator<Long, Long> droppedDueToLateness =
+      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+
+  private final WindowingStrategy<Object, W> windowingStrategy;
+  private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+
+  private GroupAlsoByWindowViaWindowSetDoFn(
+      WindowingStrategy<?, W> windowingStrategy,
+      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>)
windowingStrategy;
+    this.windowingStrategy = noWildcard;
+    this.reduceFn = reduceFn;
+  }
+
+  @Override
+  public void processElement(ProcessContext c) throws Exception {
+    KeyedWorkItem<K, InputT> element = c.element();
+
+    K key = c.element().key();
+    TimerInternals timerInternals = c.windowingInternals().timerInternals();
+
+    // It is the responsibility of the user of GroupAlsoByWindowsViaWindowSet to only
+    // provide a WindowingInternals instance with the appropriate key type for StateInternals.
+    @SuppressWarnings("unchecked")
+    StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
+
+    ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
+        new ReduceFnRunner<>(
+            key,
+            windowingStrategy,
+            stateInternals,
+            timerInternals,
+            c.windowingInternals(),
+            droppedDueToClosedWindow,
+            reduceFn,
+            c.getPipelineOptions());
+
+    reduceFnRunner.processElements(element.elementsIterable());
+    for (TimerData timer : element.timersIterable()) {
+      reduceFnRunner.onTimer(timer);
+    }
+    reduceFnRunner.persist();
+  }
+
+  @Override
+  public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
+    // Safe contravariant cast
+    @SuppressWarnings("unchecked")
+    DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn =
+        (DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
+    return asFn;
+  }
+
+  @Override
+  public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
+    return droppedDueToLateness;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/core-java/src/test/java/.placeholder
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/.placeholder b/runners/core-java/src/test/java/.placeholder
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 292cc56..5c7dc38 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -243,6 +243,27 @@
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>runners-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
index 874ec17..9a08996 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
 
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
@@ -32,7 +33,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItemCoder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index ab8d266..a1d5370 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -101,6 +101,17 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>runners-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>google-cloud-dataflow-java-runner</artifactId>
       <exclusions>
         <exclusion>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 8d9744f..0306aa1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
@@ -37,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.SystemReduceFn;
@@ -83,7 +83,7 @@ import java.util.Set;
  * This class is the key class implementing all the windowing/triggering logic of Apache
Beam.
  * To provide full compatibility and support for all the windowing/triggering combinations
offered by
  * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See
the code in
- * ({@link org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn}.
+ * ({@link org.apache.beam.runners.core.GroupAlsoByWindowsDoFn}.
  * <p/>
  * In a nutshell, when the execution arrives to this operator, we expect to have a stream
<b>already
  * grouped by key</b>. Each of the elements that enter here, registers a timer
@@ -95,7 +95,7 @@ import java.util.Set;
  * When a watermark arrives, all the registered timers are checked to see which ones are
ready to
  * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are
deregistered from
  * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers}
- * list, and are fed into the {@link org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn}
+ * list, and are fed into the {@link org.apache.beam.runners.core.GroupAlsoByWindowsDoFn}
  * for furhter processing.
  */
 public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
@@ -253,7 +253,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
   }
 
   /**
-   * Create the adequate {@link org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn},
+   * Create the adequate {@link org.apache.beam.runners.core.GroupAlsoByWindowsDoFn},
    * <b> if not already created</b>.
    * If a {@link org.apache.beam.sdk.transforms.Combine.KeyedCombineFn} was provided, then
    * a function with that combiner is created, so that elements are combined as they arrive.
This is

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 74812e8..d2d68df 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
   <name>Apache Beam :: Runners</name>
 
   <modules>
+    <module>core-java</module>
     <module>direct-java</module>
     <module>flink</module>
     <module>spark</module>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/892ead2c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
deleted file mode 100644
index c24f6da..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
- * {@link ReduceFnRunner}.
- */
-@SystemDoFnInternal
-public class GroupAlsoByWindowViaWindowSetDoFn<
-        K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
-    extends DoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT,
OutputT, W> {
-
-  public static <K, InputT, OutputT, W extends BoundedWindow>
-      DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
-          WindowingStrategy<?, W> strategy, SystemReduceFn<K, InputT, ?, OutputT,
W> reduceFn) {
-    return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, reduceFn);
-  }
-
-  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
-  protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
-
-  private final WindowingStrategy<Object, W> windowingStrategy;
-  private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
-
-  private GroupAlsoByWindowViaWindowSetDoFn(
-      WindowingStrategy<?, W> windowingStrategy,
-      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
-    @SuppressWarnings("unchecked")
-    WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>)
windowingStrategy;
-    this.windowingStrategy = noWildcard;
-    this.reduceFn = reduceFn;
-  }
-
-  @Override
-  public void processElement(ProcessContext c) throws Exception {
-    KeyedWorkItem<K, InputT> element = c.element();
-
-    K key = c.element().key();
-    TimerInternals timerInternals = c.windowingInternals().timerInternals();
-
-    // It is the responsibility of the user of GroupAlsoByWindowsViaWindowSet to only
-    // provide a WindowingInternals instance with the appropriate key type for StateInternals.
-    @SuppressWarnings("unchecked")
-    StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
-
-    ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
-        new ReduceFnRunner<>(
-            key,
-            windowingStrategy,
-            stateInternals,
-            timerInternals,
-            c.windowingInternals(),
-            droppedDueToClosedWindow,
-            reduceFn,
-            c.getPipelineOptions());
-
-    reduceFnRunner.processElements(element.elementsIterable());
-    for (TimerData timer : element.timersIterable()) {
-      reduceFnRunner.onTimer(timer);
-    }
-    reduceFnRunner.persist();
-  }
-
-  @Override
-  public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
-    // Safe contravariant cast
-    @SuppressWarnings("unchecked")
-    DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn =
-        (DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this;
-    return asFn;
-  }
-
-  @Override
-  public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
-    return droppedDueToLateness;
-  }
-}


Mime
View raw message