Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A098E200BE3 for ; Thu, 8 Dec 2016 05:21:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9F32E160B26; Thu, 8 Dec 2016 04:21:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5CE3D160B0C for ; Thu, 8 Dec 2016 05:21:54 +0100 (CET) Received: (qmail 55614 invoked by uid 500); 8 Dec 2016 04:21:53 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 55603 invoked by uid 99); 8 Dec 2016 04:21:53 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Dec 2016 04:21:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id ED2C9182132 for ; Thu, 8 Dec 2016 04:21:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id juoY6akdrBSg for ; Thu, 8 Dec 2016 04:21:40 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id B7BB85FB76 for ; Thu, 8 Dec 2016 04:21:37 +0000 (UTC) Received: (qmail 55244 invoked by uid 99); 8 Dec 2016 04:21:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Dec 2016 04:21:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0AB28F2152; Thu, 8 Dec 2016 04:21:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Thu, 08 Dec 2016 04:21:38 -0000 Message-Id: In-Reply-To: <42d6b37458b349618b3beeb3cce29883@git.apache.org> References: <42d6b37458b349618b3beeb3cce29883@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] incubator-beam git commit: Move CopyOnAccessStateInternals to runners/direct archived-at: Thu, 08 Dec 2016 04:21:56 -0000 Move CopyOnAccessStateInternals to runners/direct Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09e2f309 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09e2f309 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09e2f309 Branch: refs/heads/master Commit: 09e2f309c9554f58d2b9a2be5f83ef2751d9d40b Parents: 7729594 Author: Kenneth Knowles Authored: Wed Dec 7 14:28:39 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 7 20:21:27 2016 -0800 ---------------------------------------------------------------------- .../CopyOnAccessInMemoryStateInternals.java | 467 +++++++++++++++ .../runners/direct/DirectExecutionContext.java | 1 - .../beam/runners/direct/EvaluationContext.java | 1 - .../GroupAlsoByWindowEvaluatorFactory.java | 1 - .../beam/runners/direct/ParDoEvaluator.java | 1 - .../runners/direct/StepTransformResult.java | 1 - .../beam/runners/direct/TransformResult.java | 1 - .../CopyOnAccessInMemoryStateInternalsTest.java | 562 +++++++++++++++++++ .../runners/direct/EvaluationContextTest.java | 1 - .../StatefulParDoEvaluatorFactoryTest.java | 1 - .../CopyOnAccessInMemoryStateInternals.java | 453 --------------- .../sdk/util/state/InMemoryStateInternals.java | 33 +- .../CopyOnAccessInMemoryStateInternalsTest.java | 552 ------------------ 13 files changed, 1054 insertions(+), 1021 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java new file mode 100644 index 0000000..e486a75 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.CombineFnUtil; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryBag; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryCombiningValue; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryState; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryStateBinder; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryValue; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryWatermarkHold; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateContexts; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateTable; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTag.StateBinder; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.joda.time.Instant; + +/** + * {@link StateInternals} built on top of an underlying {@link StateTable} that contains instances + * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is + * accessed, an independent copy will be created within this table. + */ +public class CopyOnAccessInMemoryStateInternals implements StateInternals { + private final K key; + private final CopyOnAccessInMemoryStateTable table; + + /** + * Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null) + * StateInternals. + */ + public static CopyOnAccessInMemoryStateInternals withUnderlying( + K key, @Nullable CopyOnAccessInMemoryStateInternals underlying) { + return new CopyOnAccessInMemoryStateInternals(key, underlying); + } + + private CopyOnAccessInMemoryStateInternals( + K key, CopyOnAccessInMemoryStateInternals underlying) { + this.key = key; + table = + new CopyOnAccessInMemoryStateTable(key, underlying == null ? null : underlying.table); + } + + /** + * Ensures this {@link CopyOnAccessInMemoryStateInternals} is complete. Other copies of state for + * the same Step and Key may be discarded after invoking this method. + * + *

For each {@link StateNamespace}, for each {@link StateTag address} in that namespace that + * has not been bound in this {@link CopyOnAccessInMemoryStateInternals}, put a reference to that + * state within this {@link StateInternals}. + * + *

Additionally, stores the {@link WatermarkHoldState} with the earliest time bound in the + * state table after the commit is completed, enabling calls to + * {@link #getEarliestWatermarkHold()}. + * + * @return this table + */ + public CopyOnAccessInMemoryStateInternals commit() { + table.commit(); + return this; + } + + /** + * Gets the earliest Watermark Hold present in this table. + * + *

Must be called after this state has been committed. Will throw an + * {@link IllegalStateException} if the state has not been committed. + */ + public Instant getEarliestWatermarkHold() { + // After commit, the watermark hold is always present, but may be + // BoundedWindow#TIMESTAMP_MAX_VALUE if there is no hold set. + checkState( + table.earliestWatermarkHold.isPresent(), + "Can't get the earliest watermark hold in a %s before it is committed", + getClass().getSimpleName()); + return table.earliestWatermarkHold.get(); + } + + @Override + public T state(StateNamespace namespace, StateTag address) { + return state(namespace, address, StateContexts.nullContext()); + } + + @Override + public T state( + StateNamespace namespace, StateTag address, StateContext c) { + return table.get(namespace, address, c); + } + + @Override + public K getKey() { + return key; + } + + public boolean isEmpty() { + return Iterables.isEmpty(table.values()); + } + + /** + * A {@link StateTable} that, when a value is retrieved with + * {@link StateTable#get(StateNamespace, StateTag, StateContext)}, first attempts to obtain a + * copy of existing {@link State} from an underlying {@link StateTable}. + */ + private static class CopyOnAccessInMemoryStateTable extends StateTable { + private final K key; + private Optional> underlying; + + /** + * The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}. + * + *

There are three {@link StateBinderFactory} implementations used by the {@link + * CopyOnAccessInMemoryStateTable}. + *

    + *
  • The default {@link StateBinderFactory} is a {@link CopyOnBindBinderFactory}, allowing + * the table to copy any existing {@link State} values to this {@link StateTable} from the + * underlying table when accessed, at which point mutations will not be visible to the + * underlying table - effectively a "Copy by Value" binder.
  • + *
  • During the execution of the {@link #commit()} method, this is a + * {@link ReadThroughBinderFactory}, which copies the references to the existing + * {@link State} objects to this {@link StateTable}.
  • + *
  • After the execution of the {@link #commit()} method, this is an + * instance of {@link InMemoryStateBinderFactory}, which constructs new instances of state + * when a {@link StateTag} is bound.
  • + *
+ */ + private StateBinderFactory binderFactory; + + /** + * The earliest watermark hold in this table. + */ + private Optional earliestWatermarkHold; + + public CopyOnAccessInMemoryStateTable(K key, StateTable underlying) { + this.key = key; + this.underlying = Optional.fromNullable(underlying); + binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying); + earliestWatermarkHold = Optional.absent(); + } + + /** + * Copies all values in the underlying table to this table, then discards the underlying table. + * + *

If there is an underlying table, this replaces the existing + * {@link CopyOnBindBinderFactory} with a {@link ReadThroughBinderFactory}, then reads all of + * the values in the existing table, binding the state values to this table. The old StateTable + * should be discarded after the call to {@link #commit()}. + * + *

After copying all of the existing values, replace the binder factory with an instance of + * {@link InMemoryStateBinderFactory} to construct new values, since all existing values + * are bound in this {@link StateTable table} and this table represents the canonical state. + */ + private void commit() { + Instant earliestHold = getEarliestWatermarkHold(); + if (underlying.isPresent()) { + ReadThroughBinderFactory readThroughBinder = + new ReadThroughBinderFactory<>(underlying.get()); + binderFactory = readThroughBinder; + Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this); + if (earliestUnderlyingHold.isBefore(earliestHold)) { + earliestHold = earliestUnderlyingHold; + } + } + earliestWatermarkHold = Optional.of(earliestHold); + clearEmpty(); + binderFactory = new InMemoryStateBinderFactory<>(key); + underlying = Optional.absent(); + } + + /** + * Get the earliest watermark hold in this table. Ignores the contents of any underlying table. + */ + private Instant getEarliestWatermarkHold() { + Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (State existingState : this.values()) { + if (existingState instanceof WatermarkHoldState) { + Instant hold = ((WatermarkHoldState) existingState).read(); + if (hold != null && hold.isBefore(earliest)) { + earliest = hold; + } + } + } + return earliest; + } + + /** + * Clear all empty {@link StateNamespace StateNamespaces} from this table. If all states are + * empty, clear the entire table. + * + *

Because {@link InMemoryState} is not removed from the {@link StateTable} after it is + * cleared, in case contents are modified after being cleared, the table must be explicitly + * checked to ensure that it contains state and removed if not (otherwise we may never use + * the table again). + */ + private void clearEmpty() { + Collection emptyNamespaces = new HashSet<>(this.getNamespacesInUse()); + for (StateNamespace namespace : this.getNamespacesInUse()) { + for (State existingState : this.getTagsInUse(namespace).values()) { + if (!((InMemoryState) existingState).isCleared()) { + emptyNamespaces.remove(namespace); + break; + } + } + } + for (StateNamespace empty : emptyNamespaces) { + this.clearNamespace(empty); + } + } + + @Override + protected StateBinder binderForNamespace(final StateNamespace namespace, StateContext c) { + return binderFactory.forNamespace(namespace, c); + } + + private interface StateBinderFactory { + StateBinder forNamespace(StateNamespace namespace, StateContext c); + } + + /** + * {@link StateBinderFactory} that creates a copy of any existing state when the state is bound. + */ + private static class CopyOnBindBinderFactory implements StateBinderFactory { + private final K key; + private final Optional> underlying; + + public CopyOnBindBinderFactory(K key, Optional> underlying) { + this.key = key; + this.underlying = underlying; + } + + private boolean containedInUnderlying(StateNamespace namespace, StateTag tag) { + return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace) + && underlying.get().getTagsInUse(namespace).containsKey(tag); + } + + @Override + public StateBinder forNamespace(final StateNamespace namespace, final StateContext c) { + return new StateBinder() { + @Override + public WatermarkHoldState bindWatermark( + StateTag> address, + OutputTimeFn outputTimeFn) { + if (containedInUnderlying(namespace, address)) { + @SuppressWarnings("unchecked") + InMemoryState> existingState = + (InMemoryState>) + underlying.get().get(namespace, address, c); + return existingState.copy(); + } else { + return new InMemoryWatermarkHold<>( + outputTimeFn); + } + } + + @Override + public ValueState bindValue( + StateTag> address, Coder coder) { + if (containedInUnderlying(namespace, address)) { + @SuppressWarnings("unchecked") + InMemoryState> existingState = + (InMemoryState>) + underlying.get().get(namespace, address, c); + return existingState.copy(); + } else { + return new InMemoryValue<>(); + } + } + + @Override + public AccumulatorCombiningState + bindCombiningValue( + StateTag> address, + Coder accumCoder, CombineFn combineFn) { + if (containedInUnderlying(namespace, address)) { + @SuppressWarnings("unchecked") + InMemoryState> + existingState = ( + InMemoryState>) underlying.get().get(namespace, address, c); + return existingState.copy(); + } else { + return new InMemoryCombiningValue<>( + key, combineFn.asKeyedFn()); + } + } + + @Override + public BagState bindBag( + StateTag> address, Coder elemCoder) { + if (containedInUnderlying(namespace, address)) { + @SuppressWarnings("unchecked") + InMemoryState> existingState = + (InMemoryState>) + underlying.get().get(namespace, address, c); + return existingState.copy(); + } else { + return new InMemoryBag<>(); + } + } + + @Override + public AccumulatorCombiningState + bindKeyedCombiningValue( + StateTag> address, + Coder accumCoder, + KeyedCombineFn combineFn) { + if (containedInUnderlying(namespace, address)) { + @SuppressWarnings("unchecked") + InMemoryState> + existingState = ( + InMemoryState>) underlying.get().get(namespace, address, c); + return existingState.copy(); + } else { + return new InMemoryCombiningValue<>(key, combineFn); + } + } + + @Override + public AccumulatorCombiningState + bindKeyedCombiningValueWithContext( + StateTag> address, + Coder accumCoder, + KeyedCombineFnWithContext combineFn) { + return bindKeyedCombiningValue( + address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + } + }; + } + } + + /** + * {@link StateBinderFactory} that reads directly from the underlying table. Used during calls + * to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from + * the underlying table. + */ + private static class ReadThroughBinderFactory implements StateBinderFactory { + private final StateTable underlying; + + public ReadThroughBinderFactory(StateTable underlying) { + this.underlying = underlying; + } + + public Instant readThroughAndGetEarliestHold(StateTable readTo) { + Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (StateNamespace namespace : underlying.getNamespacesInUse()) { + for (Map.Entry, ? extends State> existingState : + underlying.getTagsInUse(namespace).entrySet()) { + if (!((InMemoryState) existingState.getValue()).isCleared()) { + // Only read through non-cleared values to ensure that completed windows are + // eventually discarded, and remember the earliest watermark hold from among those + // values. + State state = + readTo.get(namespace, existingState.getKey(), StateContexts.nullContext()); + if (state instanceof WatermarkHoldState) { + Instant hold = ((WatermarkHoldState) state).read(); + if (hold != null && hold.isBefore(earliestHold)) { + earliestHold = hold; + } + } + } + } + } + return earliestHold; + } + + @Override + public StateBinder forNamespace(final StateNamespace namespace, final StateContext c) { + return new StateBinder() { + @Override + public WatermarkHoldState bindWatermark( + StateTag> address, + OutputTimeFn outputTimeFn) { + return underlying.get(namespace, address, c); + } + + @Override + public ValueState bindValue( + StateTag> address, Coder coder) { + return underlying.get(namespace, address, c); + } + + @Override + public AccumulatorCombiningState + bindCombiningValue( + StateTag> address, + Coder accumCoder, CombineFn combineFn) { + return underlying.get(namespace, address, c); + } + + @Override + public BagState bindBag( + StateTag> address, Coder elemCoder) { + return underlying.get(namespace, address, c); + } + + @Override + public AccumulatorCombiningState + bindKeyedCombiningValue( + StateTag> address, + Coder accumCoder, + KeyedCombineFn combineFn) { + return underlying.get(namespace, address, c); + } + + @Override + public AccumulatorCombiningState + bindKeyedCombiningValueWithContext( + StateTag> address, + Coder accumCoder, + KeyedCombineFnWithContext combineFn) { + return bindKeyedCombiningValue( + address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + } + }; + } + } + + private static class InMemoryStateBinderFactory implements StateBinderFactory { + private final K key; + + public InMemoryStateBinderFactory(K key) { + this.key = key; + } + + @Override + public StateBinder forNamespace(StateNamespace namespace, StateContext c) { + return new InMemoryStateBinder<>(key, c); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 8cec8f7..c6051f0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -23,7 +23,6 @@ import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.util.BaseExecutionContext; import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; /** * Execution Context for the {@link DirectRunner}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index b5a23d7..230d91b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 87cbbcd..bb11923 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -50,7 +50,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 504ddc4..a915cf0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index d58b027..01b2a72 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index b4797b0..8bb5f93 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java new file mode 100644 index 0000000..deefc68 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaceForTest; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link CopyOnAccessInMemoryStateInternals}. + */ +@RunWith(JUnit4.class) +public class CopyOnAccessInMemoryStateInternalsTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + private String key = "foo"; + @Test + public void testGetWithEmpty() { + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + BagState stringBag = internals.state(namespace, bagTag); + assertThat(stringBag.read(), emptyIterable()); + + stringBag.add("bar"); + stringBag.add("baz"); + assertThat(stringBag.read(), containsInAnyOrder("baz", "bar")); + + BagState reReadStringBag = internals.state(namespace, bagTag); + assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar")); + } + + @Test + public void testGetWithAbsentInUnderlying() { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + BagState stringBag = internals.state(namespace, bagTag); + assertThat(stringBag.read(), emptyIterable()); + + stringBag.add("bar"); + stringBag.add("baz"); + assertThat(stringBag.read(), containsInAnyOrder("baz", "bar")); + + BagState reReadVoidBag = internals.state(namespace, bagTag); + assertThat(reReadVoidBag.read(), containsInAnyOrder("baz", "bar")); + + BagState underlyingState = underlying.state(namespace, bagTag); + assertThat(underlyingState.read(), emptyIterable()); + } + + /** + * Tests that retrieving state with an underlying StateInternals with an existing value returns + * a value that initially has equal value to the provided state but can be modified without + * modifying the existing state. + */ + @Test + public void testGetWithPresentInUnderlying() { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> valueTag = StateTags.value("foo", StringUtf8Coder.of()); + ValueState underlyingValue = underlying.state(namespace, valueTag); + assertThat(underlyingValue.read(), nullValue(String.class)); + + underlyingValue.write("bar"); + assertThat(underlyingValue.read(), equalTo("bar")); + + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + ValueState copyOnAccessState = internals.state(namespace, valueTag); + assertThat(copyOnAccessState.read(), equalTo("bar")); + + copyOnAccessState.write("baz"); + assertThat(copyOnAccessState.read(), equalTo("baz")); + assertThat(underlyingValue.read(), equalTo("bar")); + + ValueState reReadUnderlyingValue = underlying.state(namespace, valueTag); + assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); + } + + @Test + public void testBagStateWithUnderlying() { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> valueTag = StateTags.bag("foo", VarIntCoder.of()); + BagState underlyingValue = underlying.state(namespace, valueTag); + assertThat(underlyingValue.read(), emptyIterable()); + + underlyingValue.add(1); + assertThat(underlyingValue.read(), containsInAnyOrder(1)); + + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + BagState copyOnAccessState = internals.state(namespace, valueTag); + assertThat(copyOnAccessState.read(), containsInAnyOrder(1)); + + copyOnAccessState.add(4); + assertThat(copyOnAccessState.read(), containsInAnyOrder(4, 1)); + assertThat(underlyingValue.read(), containsInAnyOrder(1)); + + BagState reReadUnderlyingValue = underlying.state(namespace, valueTag); + assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); + } + + @Test + public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + CombineFn sumLongFn = new Sum.SumLongFn(); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + CoderRegistry reg = TestPipeline.create().getCoderRegistry(); + StateTag> stateTag = + StateTags.combiningValue("summer", + sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn); + CombiningState underlyingValue = underlying.state(namespace, stateTag); + assertThat(underlyingValue.read(), equalTo(0L)); + + underlyingValue.add(1L); + assertThat(underlyingValue.read(), equalTo(1L)); + + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + CombiningState copyOnAccessState = internals.state(namespace, stateTag); + assertThat(copyOnAccessState.read(), equalTo(1L)); + + copyOnAccessState.add(4L); + assertThat(copyOnAccessState.read(), equalTo(5L)); + assertThat(underlyingValue.read(), equalTo(1L)); + + CombiningState reReadUnderlyingValue = underlying.state(namespace, stateTag); + assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); + } + + @Test + public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + KeyedCombineFn sumLongFn = new Sum.SumLongFn().asKeyedFn(); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + CoderRegistry reg = TestPipeline.create().getCoderRegistry(); + StateTag> stateTag = + StateTags.keyedCombiningValue( + "summer", + sumLongFn.getAccumulatorCoder( + reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)), + sumLongFn); + CombiningState underlyingValue = underlying.state(namespace, stateTag); + assertThat(underlyingValue.read(), equalTo(0L)); + + underlyingValue.add(1L); + assertThat(underlyingValue.read(), equalTo(1L)); + + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + CombiningState copyOnAccessState = internals.state(namespace, stateTag); + assertThat(copyOnAccessState.read(), equalTo(1L)); + + copyOnAccessState.add(4L); + assertThat(copyOnAccessState.read(), equalTo(5L)); + assertThat(underlyingValue.read(), equalTo(1L)); + + CombiningState reReadUnderlyingValue = underlying.state(namespace, stateTag); + assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); + } + + @Test + public void testWatermarkHoldStateWithUnderlying() { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + + OutputTimeFn outputTimeFn = + OutputTimeFns.outputAtEarliestInputTimestamp(); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> stateTag = + StateTags.watermarkStateInternal("wmstate", outputTimeFn); + WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag); + assertThat(underlyingValue.read(), nullValue()); + + underlyingValue.add(new Instant(250L)); + assertThat(underlyingValue.read(), equalTo(new Instant(250L))); + + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag); + assertThat(copyOnAccessState.read(), equalTo(new Instant(250L))); + + copyOnAccessState.add(new Instant(100L)); + assertThat(copyOnAccessState.read(), equalTo(new Instant(100L))); + assertThat(underlyingValue.read(), equalTo(new Instant(250L))); + + copyOnAccessState.add(new Instant(500L)); + assertThat(copyOnAccessState.read(), equalTo(new Instant(100L))); + + WatermarkHoldState reReadUnderlyingValue = + underlying.state(namespace, stateTag); + assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); + } + + @Test + public void testCommitWithoutUnderlying() { + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + BagState stringBag = internals.state(namespace, bagTag); + assertThat(stringBag.read(), emptyIterable()); + + stringBag.add("bar"); + stringBag.add("baz"); + assertThat(stringBag.read(), containsInAnyOrder("baz", "bar")); + + internals.commit(); + + BagState reReadStringBag = internals.state(namespace, bagTag); + assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar")); + assertThat(internals.isEmpty(), is(false)); + } + + @Test + public void testCommitWithUnderlying() { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + BagState stringBag = underlying.state(namespace, bagTag); + assertThat(stringBag.read(), emptyIterable()); + + stringBag.add("bar"); + stringBag.add("baz"); + + internals.commit(); + BagState reReadStringBag = internals.state(namespace, bagTag); + assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar")); + + reReadStringBag.add("spam"); + + BagState underlyingState = underlying.state(namespace, bagTag); + assertThat(underlyingState.read(), containsInAnyOrder("spam", "bar", "baz")); + assertThat(underlyingState, is(theInstance(stringBag))); + assertThat(internals.isEmpty(), is(false)); + } + + @Test + public void testCommitWithClearedInUnderlying() { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + CopyOnAccessInMemoryStateInternals secondUnderlying = + spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying)); + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, secondUnderlying); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + BagState stringBag = underlying.state(namespace, bagTag); + assertThat(stringBag.read(), emptyIterable()); + + stringBag.add("bar"); + stringBag.add("baz"); + stringBag.clear(); + // We should not read through the cleared bag + secondUnderlying.commit(); + + // Should not be visible + stringBag.add("foo"); + + internals.commit(); + BagState internalsStringBag = internals.state(namespace, bagTag); + assertThat(internalsStringBag.read(), emptyIterable()); + verify(secondUnderlying, never()).state(namespace, bagTag); + assertThat(internals.isEmpty(), is(false)); + } + + @Test + public void testCommitWithOverwrittenUnderlying() { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + BagState stringBag = underlying.state(namespace, bagTag); + assertThat(stringBag.read(), emptyIterable()); + + stringBag.add("bar"); + stringBag.add("baz"); + + BagState internalsState = internals.state(namespace, bagTag); + internalsState.add("eggs"); + internalsState.add("ham"); + internalsState.add("0x00ff00"); + internalsState.add("&"); + + internals.commit(); + + BagState reReadInternalState = internals.state(namespace, bagTag); + assertThat( + reReadInternalState.read(), + containsInAnyOrder("bar", "baz", "0x00ff00", "eggs", "&", "ham")); + BagState reReadUnderlyingState = underlying.state(namespace, bagTag); + assertThat(reReadUnderlyingState.read(), containsInAnyOrder("bar", "baz")); + } + + @Test + public void testCommitWithAddedUnderlying() { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + + internals.commit(); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + BagState stringBag = underlying.state(namespace, bagTag); + assertThat(stringBag.read(), emptyIterable()); + + stringBag.add("bar"); + stringBag.add("baz"); + + BagState internalState = internals.state(namespace, bagTag); + assertThat(internalState.read(), emptyIterable()); + + BagState reReadUnderlyingState = underlying.state(namespace, bagTag); + assertThat(reReadUnderlyingState.read(), containsInAnyOrder("bar", "baz")); + } + + @Test + public void testCommitWithEmptyTableIsEmpty() { + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + + internals.commit(); + + assertThat(internals.isEmpty(), is(true)); + } + + @Test + public void testCommitWithOnlyClearedValuesIsEmpty() { + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + BagState stringBag = internals.state(namespace, bagTag); + assertThat(stringBag.read(), emptyIterable()); + + stringBag.add("foo"); + stringBag.clear(); + + internals.commit(); + + assertThat(internals.isEmpty(), is(true)); + } + + @Test + public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() { + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); + + StateNamespace namespace = new StateNamespaceForTest("foo"); + StateTag> bagTag = StateTags.bag("foo", StringUtf8Coder.of()); + BagState stringBag = underlying.state(namespace, bagTag); + assertThat(stringBag.read(), emptyIterable()); + + stringBag.add("bar"); + stringBag.add("baz"); + + internals.commit(); + assertThat(internals.isEmpty(), is(false)); + } + + @Test + public void testGetEarliestWatermarkHoldAfterCommit() { + BoundedWindow first = new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(2048L); + } + }; + BoundedWindow second = new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(689743L); + } + }; + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); + + StateTag> firstHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState firstHold = + internals.state(StateNamespaces.window(null, first), firstHoldAddress); + firstHold.add(new Instant(22L)); + + StateTag> secondHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState secondHold = + internals.state(StateNamespaces.window(null, second), secondHoldAddress); + secondHold.add(new Instant(2L)); + + internals.commit(); + assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(2L))); + } + + @Test + public void testGetEarliestWatermarkHoldWithEarliestInUnderlyingTable() { + BoundedWindow first = new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(2048L); + } + }; + BoundedWindow second = new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(689743L); + } + }; + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); + StateTag> firstHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState firstHold = + underlying.state(StateNamespaces.window(null, first), firstHoldAddress); + firstHold.add(new Instant(22L)); + + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); + + StateTag> secondHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState secondHold = + internals.state(StateNamespaces.window(null, second), secondHoldAddress); + secondHold.add(new Instant(244L)); + + internals.commit(); + assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(22L))); + } + + @Test + public void testGetEarliestWatermarkHoldWithEarliestInNewTable() { + BoundedWindow first = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(2048L); + } + }; + BoundedWindow second = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(689743L); + } + }; + CopyOnAccessInMemoryStateInternals underlying = + CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); + StateTag> firstHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState firstHold = + underlying.state(StateNamespaces.window(null, first), firstHoldAddress); + firstHold.add(new Instant(224L)); + + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); + + StateTag> secondHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState secondHold = + internals.state(StateNamespaces.window(null, second), secondHoldAddress); + secondHold.add(new Instant(24L)); + + internals.commit(); + assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(24L))); + } + + @Test + public void testGetEarliestHoldBeforeCommit() { + CopyOnAccessInMemoryStateInternals internals = + CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); + + internals + .state( + StateNamespaces.global(), + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp())) + .add(new Instant(1234L)); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage(CopyOnAccessInMemoryStateInternals.class.getSimpleName()); + thrown.expectMessage("Can't get the earliest watermark hold"); + thrown.expectMessage("before it is committed"); + + internals.getEarliestWatermarkHold(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index a2bb15e..f11c370 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -58,7 +58,6 @@ import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index 06c85ef..7c086a1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -51,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java deleted file mode 100644 index 3ca00a9..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java +++ /dev/null @@ -1,453 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.base.Optional; -import com.google.common.collect.Iterables; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.CombineFnUtil; -import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryState; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; -import org.joda.time.Instant; - -/** - * {@link StateInternals} built on top of an underlying {@link StateTable} that contains instances - * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is - * accessed, an independent copy will be created within this table. - */ -public class CopyOnAccessInMemoryStateInternals implements StateInternals { - private final K key; - private final CopyOnAccessInMemoryStateTable table; - - /** - * Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null) - * StateInternals. - */ - public static CopyOnAccessInMemoryStateInternals withUnderlying( - K key, @Nullable CopyOnAccessInMemoryStateInternals underlying) { - return new CopyOnAccessInMemoryStateInternals(key, underlying); - } - - private CopyOnAccessInMemoryStateInternals( - K key, CopyOnAccessInMemoryStateInternals underlying) { - this.key = key; - table = - new CopyOnAccessInMemoryStateTable(key, underlying == null ? null : underlying.table); - } - - /** - * Ensures this {@link CopyOnAccessInMemoryStateInternals} is complete. Other copies of state for - * the same Step and Key may be discarded after invoking this method. - * - *

For each {@link StateNamespace}, for each {@link StateTag address} in that namespace that - * has not been bound in this {@link CopyOnAccessInMemoryStateInternals}, put a reference to that - * state within this {@link StateInternals}. - * - *

Additionally, stores the {@link WatermarkHoldState} with the earliest time bound in the - * state table after the commit is completed, enabling calls to - * {@link #getEarliestWatermarkHold()}. - * - * @return this table - */ - public CopyOnAccessInMemoryStateInternals commit() { - table.commit(); - return this; - } - - /** - * Gets the earliest Watermark Hold present in this table. - * - *

Must be called after this state has been committed. Will throw an - * {@link IllegalStateException} if the state has not been committed. - */ - public Instant getEarliestWatermarkHold() { - // After commit, the watermark hold is always present, but may be - // BoundedWindow#TIMESTAMP_MAX_VALUE if there is no hold set. - checkState( - table.earliestWatermarkHold.isPresent(), - "Can't get the earliest watermark hold in a %s before it is committed", - getClass().getSimpleName()); - return table.earliestWatermarkHold.get(); - } - - @Override - public T state(StateNamespace namespace, StateTag address) { - return state(namespace, address, StateContexts.nullContext()); - } - - @Override - public T state( - StateNamespace namespace, StateTag address, StateContext c) { - return table.get(namespace, address, c); - } - - @Override - public K getKey() { - return key; - } - - public boolean isEmpty() { - return Iterables.isEmpty(table.values()); - } - - /** - * A {@link StateTable} that, when a value is retrieved with - * {@link StateTable#get(StateNamespace, StateTag, StateContext)}, first attempts to obtain a - * copy of existing {@link State} from an underlying {@link StateTable}. - */ - private static class CopyOnAccessInMemoryStateTable extends StateTable { - private final K key; - private Optional> underlying; - - /** - * The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}. - * - *

There are three {@link StateBinderFactory} implementations used by the {@link - * CopyOnAccessInMemoryStateTable}. - *

    - *
  • The default {@link StateBinderFactory} is a {@link CopyOnBindBinderFactory}, allowing - * the table to copy any existing {@link State} values to this {@link StateTable} from the - * underlying table when accessed, at which point mutations will not be visible to the - * underlying table - effectively a "Copy by Value" binder.
  • - *
  • During the execution of the {@link #commit()} method, this is a - * {@link ReadThroughBinderFactory}, which copies the references to the existing - * {@link State} objects to this {@link StateTable}.
  • - *
  • After the execution of the {@link #commit()} method, this is an - * instance of {@link InMemoryStateBinderFactory}, which constructs new instances of state - * when a {@link StateTag} is bound.
  • - *
- */ - private StateBinderFactory binderFactory; - - /** - * The earliest watermark hold in this table. - */ - private Optional earliestWatermarkHold; - - public CopyOnAccessInMemoryStateTable(K key, StateTable underlying) { - this.key = key; - this.underlying = Optional.fromNullable(underlying); - binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying); - earliestWatermarkHold = Optional.absent(); - } - - /** - * Copies all values in the underlying table to this table, then discards the underlying table. - * - *

If there is an underlying table, this replaces the existing - * {@link CopyOnBindBinderFactory} with a {@link ReadThroughBinderFactory}, then reads all of - * the values in the existing table, binding the state values to this table. The old StateTable - * should be discarded after the call to {@link #commit()}. - * - *

After copying all of the existing values, replace the binder factory with an instance of - * {@link InMemoryStateBinderFactory} to construct new values, since all existing values - * are bound in this {@link StateTable table} and this table represents the canonical state. - */ - private void commit() { - Instant earliestHold = getEarliestWatermarkHold(); - if (underlying.isPresent()) { - ReadThroughBinderFactory readThroughBinder = - new ReadThroughBinderFactory<>(underlying.get()); - binderFactory = readThroughBinder; - Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this); - if (earliestUnderlyingHold.isBefore(earliestHold)) { - earliestHold = earliestUnderlyingHold; - } - } - earliestWatermarkHold = Optional.of(earliestHold); - clearEmpty(); - binderFactory = new InMemoryStateBinderFactory<>(key); - underlying = Optional.absent(); - } - - /** - * Get the earliest watermark hold in this table. Ignores the contents of any underlying table. - */ - private Instant getEarliestWatermarkHold() { - Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (State existingState : this.values()) { - if (existingState instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState) existingState).read(); - if (hold != null && hold.isBefore(earliest)) { - earliest = hold; - } - } - } - return earliest; - } - - /** - * Clear all empty {@link StateNamespace StateNamespaces} from this table. If all states are - * empty, clear the entire table. - * - *

Because {@link InMemoryState} is not removed from the {@link StateTable} after it is - * cleared, in case contents are modified after being cleared, the table must be explicitly - * checked to ensure that it contains state and removed if not (otherwise we may never use - * the table again). - */ - private void clearEmpty() { - Collection emptyNamespaces = new HashSet<>(this.getNamespacesInUse()); - for (StateNamespace namespace : this.getNamespacesInUse()) { - for (State existingState : this.getTagsInUse(namespace).values()) { - if (!((InMemoryState) existingState).isCleared()) { - emptyNamespaces.remove(namespace); - break; - } - } - } - for (StateNamespace empty : emptyNamespaces) { - this.clearNamespace(empty); - } - } - - @Override - protected StateBinder binderForNamespace(final StateNamespace namespace, StateContext c) { - return binderFactory.forNamespace(namespace, c); - } - - private interface StateBinderFactory { - StateBinder forNamespace(StateNamespace namespace, StateContext c); - } - - /** - * {@link StateBinderFactory} that creates a copy of any existing state when the state is bound. - */ - private static class CopyOnBindBinderFactory implements StateBinderFactory { - private final K key; - private final Optional> underlying; - - public CopyOnBindBinderFactory(K key, Optional> underlying) { - this.key = key; - this.underlying = underlying; - } - - private boolean containedInUnderlying(StateNamespace namespace, StateTag tag) { - return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace) - && underlying.get().getTagsInUse(namespace).containsKey(tag); - } - - @Override - public StateBinder forNamespace(final StateNamespace namespace, final StateContext c) { - return new StateBinder() { - @Override - public WatermarkHoldState bindWatermark( - StateTag> address, - OutputTimeFn outputTimeFn) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> existingState = - (InMemoryStateInternals.InMemoryState>) - underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryWatermarkHold<>( - outputTimeFn); - } - } - - @Override - public ValueState bindValue( - StateTag> address, Coder coder) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> existingState = - (InMemoryStateInternals.InMemoryState>) - underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryValue<>(); - } - } - - @Override - public AccumulatorCombiningState - bindCombiningValue( - StateTag> address, - Coder accumCoder, CombineFn combineFn) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> - existingState = ( - InMemoryStateInternals - .InMemoryState>) underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryCombiningValue<>( - key, combineFn.asKeyedFn()); - } - } - - @Override - public BagState bindBag( - StateTag> address, Coder elemCoder) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> existingState = - (InMemoryStateInternals.InMemoryState>) - underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryBag<>(); - } - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValue( - StateTag> address, - Coder accumCoder, - KeyedCombineFn combineFn) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> - existingState = ( - InMemoryStateInternals - .InMemoryState>) underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryCombiningValue<>(key, combineFn); - } - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValueWithContext( - StateTag> address, - Coder accumCoder, - KeyedCombineFnWithContext combineFn) { - return bindKeyedCombiningValue( - address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); - } - }; - } - } - - /** - * {@link StateBinderFactory} that reads directly from the underlying table. Used during calls - * to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from - * the underlying table. - */ - private static class ReadThroughBinderFactory implements StateBinderFactory { - private final StateTable underlying; - - public ReadThroughBinderFactory(StateTable underlying) { - this.underlying = underlying; - } - - public Instant readThroughAndGetEarliestHold(StateTable readTo) { - Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (StateNamespace namespace : underlying.getNamespacesInUse()) { - for (Map.Entry, ? extends State> existingState : - underlying.getTagsInUse(namespace).entrySet()) { - if (!((InMemoryState) existingState.getValue()).isCleared()) { - // Only read through non-cleared values to ensure that completed windows are - // eventually discarded, and remember the earliest watermark hold from among those - // values. - State state = - readTo.get(namespace, existingState.getKey(), StateContexts.nullContext()); - if (state instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState) state).read(); - if (hold != null && hold.isBefore(earliestHold)) { - earliestHold = hold; - } - } - } - } - } - return earliestHold; - } - - @Override - public StateBinder forNamespace(final StateNamespace namespace, final StateContext c) { - return new StateBinder() { - @Override - public WatermarkHoldState bindWatermark( - StateTag> address, - OutputTimeFn outputTimeFn) { - return underlying.get(namespace, address, c); - } - - @Override - public ValueState bindValue( - StateTag> address, Coder coder) { - return underlying.get(namespace, address, c); - } - - @Override - public AccumulatorCombiningState - bindCombiningValue( - StateTag> address, - Coder accumCoder, CombineFn combineFn) { - return underlying.get(namespace, address, c); - } - - @Override - public BagState bindBag( - StateTag> address, Coder elemCoder) { - return underlying.get(namespace, address, c); - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValue( - StateTag> address, - Coder accumCoder, - KeyedCombineFn combineFn) { - return underlying.get(namespace, address, c); - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValueWithContext( - StateTag> address, - Coder accumCoder, - KeyedCombineFnWithContext combineFn) { - return bindKeyedCombiningValue( - address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); - } - }; - } - } - - private static class InMemoryStateBinderFactory implements StateBinderFactory { - private final K key; - - public InMemoryStateBinderFactory(K key) { - this.key = key; - } - - @Override - public StateBinder forNamespace(StateNamespace namespace, StateContext c) { - return new InMemoryStateInternals.InMemoryStateBinder<>(key, c); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java index efb270c..6611837 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java @@ -56,7 +56,11 @@ public class InMemoryStateInternals implements StateInternals { return key; } - interface InMemoryState> { + /** + * Interface common to all in-memory state cells. Includes ability to see whether a cell has been + * cleared and the ability to create a clone of the contents. + */ + public interface InMemoryState> { boolean isCleared(); T copy(); } @@ -94,11 +98,11 @@ public class InMemoryStateInternals implements StateInternals { /** * A {@link StateBinder} that returns In Memory {@link State} objects. */ - static class InMemoryStateBinder implements StateBinder { + public static class InMemoryStateBinder implements StateBinder { private final K key; private final StateContext c; - InMemoryStateBinder(K key, StateContext c) { + public InMemoryStateBinder(K key, StateContext c) { this.key = key; this.c = c; } @@ -150,7 +154,11 @@ public class InMemoryStateInternals implements StateInternals { } } - static final class InMemoryValue implements ValueState, InMemoryState> { + /** + * An {@link InMemoryState} implementation of {@link ValueState}. + */ + public static final class InMemoryValue + implements ValueState, InMemoryState> { private boolean isCleared = true; private T value = null; @@ -194,7 +202,10 @@ public class InMemoryStateInternals implements StateInternals { } } - static final class InMemoryWatermarkHold + /** + * An {@link InMemoryState} implementation of {@link WatermarkHoldState}. + */ + public static final class InMemoryWatermarkHold implements WatermarkHoldState, InMemoryState> { private final OutputTimeFn outputTimeFn; @@ -267,7 +278,10 @@ public class InMemoryStateInternals implements StateInternals { } } - static final class InMemoryCombiningValue + /** + * An {@link InMemoryState} implementation of {@link AccumulatorCombiningState}. + */ + public static final class InMemoryCombiningValue implements AccumulatorCombiningState, InMemoryState> { private final K key; @@ -275,7 +289,7 @@ public class InMemoryStateInternals implements StateInternals { private final KeyedCombineFn combineFn; private AccumT accum; - InMemoryCombiningValue( + public InMemoryCombiningValue( K key, KeyedCombineFn combineFn) { this.key = key; this.combineFn = combineFn; @@ -353,7 +367,10 @@ public class InMemoryStateInternals implements StateInternals { } } - static final class InMemoryBag implements BagState, InMemoryState> { + /** + * An {@link InMemoryState} implementation of {@link BagState}. + */ + public static final class InMemoryBag implements BagState, InMemoryState> { private List contents = new ArrayList<>(); @Override