Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 80631200CA4 for ; Wed, 3 May 2017 01:38:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7F426160BAC; Tue, 2 May 2017 23:38:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A2957160B9D for ; Wed, 3 May 2017 01:38:37 +0200 (CEST) Received: (qmail 25556 invoked by uid 500); 2 May 2017 23:38:36 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 25547 invoked by uid 99); 2 May 2017 23:38:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 May 2017 23:38:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A8F75DFE8F; Tue, 2 May 2017 23:38:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.apache.org Date: Tue, 02 May 2017 23:38:36 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] beam git commit: Full removal of Aggregators in Java SDK and Runners archived-at: Tue, 02 May 2017 23:38:38 -0000 Repository: beam Updated Branches: refs/heads/master 5bfd3e049 -> 4682238dc http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java deleted file mode 100644 index cfaf0a6..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.MoreObjects; -import java.io.Serializable; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * An {@link Aggregator} that delegates calls to {@link #addValue} to another aggregator. - * - *

This {@link Aggregator} is designed to be constructed without a delegate, at pipeline - * construction time, and serialized within a {@link DoFn}. The delegate aggregator to which it - * submits values must be provided by the runner at execution time. - * - * @param the type of input element - * @param the type of output element - */ -public class DelegatingAggregator - implements Aggregator, Serializable { - private static final AtomicInteger ID_GEN = new AtomicInteger(); - private final int id; - - private final String name; - - private final CombineFn combineFn; - - private Aggregator delegate; - - public DelegatingAggregator(String name, - CombineFn combiner) { - this.id = ID_GEN.getAndIncrement(); - this.name = checkNotNull(name, "name cannot be null"); - // Safe contravariant cast - @SuppressWarnings("unchecked") - CombineFn specificCombiner = - (CombineFn) checkNotNull(combiner, "combineFn cannot be null"); - this.combineFn = specificCombiner; - } - - @Override - public void addValue(AggInputT value) { - if (delegate == null) { - throw new IllegalStateException( - String.format( - "addValue cannot be called on Aggregator outside of the execution of a %s.", - DoFn.class.getSimpleName())); - } else { - delegate.addValue(value); - } - } - - @Override - public String getName() { - return name; - } - - @Override - public CombineFn getCombineFn() { - return combineFn; - } - - /** - * Sets the current delegate of the Aggregator. - * - * @param delegate the delegate to set in this aggregator - */ - public void setDelegate(Aggregator delegate) { - this.delegate = delegate; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("name", name) - .add("combineFn", combineFn) - .toString(); - } - - @Override - public int hashCode() { - return Objects.hash(id, name, combineFn.getClass()); - } - - /** - * Indicates whether some other object is "equal to" this one. - * - *

{@code DelegatingAggregator} instances are equal if they have the same name, their - * CombineFns are the same class, and they have identical IDs. - */ - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (o == null) { - return false; - } - if (o instanceof DelegatingAggregator) { - DelegatingAggregator that = (DelegatingAggregator) o; - return Objects.equals(this.id, that.id) - && Objects.equals(this.name, that.name) - && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass()); - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java index 4cb1142..d3ebbb7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java @@ -47,8 +47,6 @@ import org.apache.beam.sdk.values.TimestampedValue; *

Example 2: track a latest computed value in an aggregator: *

{@code
  * class MyDoFn extends DoFn {
- *  private Aggregator, Double> latestValue =
- *    createAggregator("latestValue", new Latest.LatestFn());
  *
  *  {@literal @}ProcessElement
  *  public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 72cba79..f29aeb9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -43,7 +43,6 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fake.FakeAggregatorFactory;
 import org.apache.beam.fn.harness.fake.FakeStepContext;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
@@ -316,7 +315,6 @@ public class ProcessBundleHandler {
             (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()),
             new ArrayList<>(doFnInfo.getOutputMap().values()),
             new FakeStepContext(),
-            new FakeAggregatorFactory(),
             (WindowingStrategy) doFnInfo.getWindowingStrategy());
     return runner;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
deleted file mode 100644
index b3b7f48..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.fake;
-
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * A fake implementation of an {@link AggregatorFactory} that is to be filled in at a later time.
- * The factory returns {@link Aggregator}s that do nothing when a value is added.
- */
-public class FakeAggregatorFactory implements AggregatorFactory {
-  @Override
-  public  Aggregator createAggregatorForDoFn(
-      Class fnClass,
-      StepContext stepContext,
-      String aggregatorName,
-      CombineFn combine) {
-    return new Aggregator() {
-      @Override
-      public void addValue(InputT value) {}
-
-      @Override
-      public String getName() {
-        return aggregatorName;
-      }
-
-      @Override
-      public CombineFn getCombineFn() {
-        return combine;
-      }
-    };
-  }
-}