beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/3] beam git commit: Full removal of Aggregators in Java SDK and Runners
Date Tue, 02 May 2017 23:38:36 GMT
Repository: beam
Updated Branches:
  refs/heads/master 5bfd3e049 -> 4682238dc


http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
deleted file mode 100644
index cfaf0a6..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.MoreObjects;
-import java.io.Serializable;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * An {@link Aggregator} that delegates calls to {@link #addValue} to another aggregator.
- *
- * <p>This {@link Aggregator} is designed to be constructed without a delegate, at
pipeline
- * construction time, and serialized within a {@link DoFn}. The delegate aggregator to which
it
- * submits values must be provided by the runner at execution time.
- *
- * @param <AggInputT> the type of input element
- * @param <AggOutputT> the type of output element
- */
-public class DelegatingAggregator<AggInputT, AggOutputT>
-    implements Aggregator<AggInputT, AggOutputT>, Serializable {
-  private static final AtomicInteger ID_GEN = new AtomicInteger();
-  private final int id;
-
-  private final String name;
-
-  private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
-  private Aggregator<AggInputT, ?> delegate;
-
-  public DelegatingAggregator(String name,
-      CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    this.id = ID_GEN.getAndIncrement();
-    this.name = checkNotNull(name, "name cannot be null");
-    // Safe contravariant cast
-    @SuppressWarnings("unchecked")
-    CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
-        (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot
be null");
-    this.combineFn = specificCombiner;
-  }
-
-  @Override
-  public void addValue(AggInputT value) {
-    if (delegate == null) {
-      throw new IllegalStateException(
-          String.format(
-              "addValue cannot be called on Aggregator outside of the execution of a %s.",
-              DoFn.class.getSimpleName()));
-    } else {
-      delegate.addValue(value);
-    }
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
-    return combineFn;
-  }
-
-  /**
-   * Sets the current delegate of the Aggregator.
-   *
-   * @param delegate the delegate to set in this aggregator
-   */
-  public void setDelegate(Aggregator<AggInputT, ?> delegate) {
-    this.delegate = delegate;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(getClass())
-        .add("name", name)
-        .add("combineFn", combineFn)
-        .toString();
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(id, name, combineFn.getClass());
-  }
-
-  /**
-   * Indicates whether some other object is "equal to" this one.
-   *
-   * <p>{@code DelegatingAggregator} instances are equal if they have the same name,
their
-   * CombineFns are the same class, and they have identical IDs.
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (o == this) {
-      return true;
-    }
-    if (o == null) {
-      return false;
-    }
-    if (o instanceof DelegatingAggregator) {
-      DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
-      return Objects.equals(this.id, that.id)
-          && Objects.equals(this.name, that.name)
-          && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index 4cb1142..d3ebbb7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -47,8 +47,6 @@ import org.apache.beam.sdk.values.TimestampedValue;
  * <p>Example 2: track a latest computed value in an aggregator:
  * <pre>{@code
  * class MyDoFn extends DoFn<String, String> {
- *  private Aggregator<TimestampedValue<Double>, Double> latestValue =
- *    createAggregator("latestValue", new Latest.LatestFn<Double>());
  *
  *  {@literal @}ProcessElement
  *  public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 72cba79..f29aeb9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -43,7 +43,6 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fake.FakeAggregatorFactory;
 import org.apache.beam.fn.harness.fake.FakeStepContext;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
@@ -316,7 +315,6 @@ public class ProcessBundleHandler {
             (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()),
             new ArrayList<>(doFnInfo.getOutputMap().values()),
             new FakeStepContext(),
-            new FakeAggregatorFactory(),
             (WindowingStrategy) doFnInfo.getWindowingStrategy());
     return runner;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
deleted file mode 100644
index b3b7f48..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.fake;
-
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * A fake implementation of an {@link AggregatorFactory} that is to be filled in at a later
time.
- * The factory returns {@link Aggregator}s that do nothing when a value is added.
- */
-public class FakeAggregatorFactory implements AggregatorFactory {
-  @Override
-  public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-      Class<?> fnClass,
-      StepContext stepContext,
-      String aggregatorName,
-      CombineFn<InputT, AccumT, OutputT> combine) {
-    return new Aggregator<InputT, OutputT>() {
-      @Override
-      public void addValue(InputT value) {}
-
-      @Override
-      public String getName() {
-        return aggregatorName;
-      }
-
-      @Override
-      public CombineFn<InputT, ?, OutputT> getCombineFn() {
-        return combine;
-      }
-    };
-  }
-}


Mime
View raw message