beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Use the ParDo Application to Cache DoFns
Date Wed, 20 Jul 2016 17:56:03 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master f547f70e1 -> 436e4a34e


Use the ParDo Application to Cache DoFns

A DoFn application is the scope of reuse.

Factor CloningThreadLocal as the top-level class instead of
SerializableCloningThreadLocalCacheLoader, and extract the Fn from the
AppliedPTransform when loading an absent element.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00195d25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00195d25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00195d25

Branch: refs/heads/master
Commit: 00195d2543eb347cc3669a4ac89e98da0bc4dca4
Parents: f0119b2
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Jun 28 15:44:49 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Jul 20 10:55:32 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CloningThreadLocal.java | 43 +++++++++
 .../direct/ParDoMultiEvaluatorFactory.java      | 44 ++++++---
 .../direct/ParDoSingleEvaluatorFactory.java     | 50 ++++++----
 ...rializableCloningThreadLocalCacheLoader.java | 54 -----------
 .../runners/direct/CloningThreadLocalTest.java  | 92 ++++++++++++++++++
 ...izableCloningThreadLocalCacheLoaderTest.java | 99 --------------------
 6 files changed, 198 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningThreadLocal.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningThreadLocal.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningThreadLocal.java
new file mode 100644
index 0000000..b9dc4ca
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningThreadLocal.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.util.SerializableUtils;
+
+import java.io.Serializable;
+
+/**
+ * A {@link ThreadLocal} that obtains the initial value by cloning an original value.
+ */
+class CloningThreadLocal<T extends Serializable> extends ThreadLocal<T> {
+  public static <T extends Serializable> CloningThreadLocal<T> of(T original)
{
+    return new CloningThreadLocal<>(original);
+  }
+
+  private final T original;
+
+  private CloningThreadLocal(T original) {
+    this.original = original;
+  }
+
+  @Override
+  public T initialValue() {
+    return SerializableUtils.clone(original);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index e008bdc..b87cd3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
 
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
 import java.util.Map;
@@ -36,11 +37,24 @@ import java.util.Map;
  * {@link BoundMulti} primitive {@link PTransform}.
  */
 class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
-  private final LoadingCache<DoFn<?, ?>, ThreadLocal<DoFn<?, ?>>>
fnClones;
+  private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?,
?>>>
+      fnClones;
 
   public ParDoMultiEvaluatorFactory() {
-    fnClones = CacheBuilder.newBuilder()
-        .build(SerializableCloningThreadLocalCacheLoader.<DoFn<?, ?>>create());
+    fnClones =
+        CacheBuilder.newBuilder()
+            .build(
+                new CacheLoader<
+                    AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?,
?>>>() {
+                  @Override
+                  public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?,
?, BoundMulti<?, ?>> key)
+                      throws Exception {
+                    @SuppressWarnings({"unchecked", "rawtypes"})
+                    ThreadLocal threadLocal =
+                        (ThreadLocal) CloningThreadLocal.of(key.getTransform().getFn());
+                    return threadLocal;
+                  }
+                });
   }
 
   @Override
@@ -59,19 +73,21 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory
{
       CommittedBundle<InT> inputBundle,
       EvaluationContext evaluationContext) {
     Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
-    DoFn<InT, OuT> fn = application.getTransform().getFn();
 
-    @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InT, OuT>>
fnLocal =
-        (ThreadLocal) fnClones.getUnchecked(application.getTransform().getFn());
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    ThreadLocal<DoFn<InT, OuT>> fnLocal =
+        (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
     try {
-      TransformEvaluator<InT> parDoEvaluator = ParDoEvaluator.create(evaluationContext,
-          inputBundle,
-          application,
-          fnLocal.get(),
-          application.getTransform().getSideInputs(),
-          application.getTransform().getMainOutputTag(),
-          application.getTransform().getSideOutputTags().getAll(),
-          outputs);
+      TransformEvaluator<InT> parDoEvaluator =
+          ParDoEvaluator.create(
+              evaluationContext,
+              inputBundle,
+              application,
+              fnLocal.get(),
+              application.getTransform().getSideInputs(),
+              application.getTransform().getMainOutputTag(),
+              application.getTransform().getSideOutputTags().getAll(),
+              outputs);
       return ThreadLocalInvalidatingTransformEvaluator.wrapping(parDoEvaluator, fnLocal);
     } catch (Exception e) {
       fnLocal.remove();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 0f7fc83..e9c7dd6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableMap;
 
@@ -36,11 +37,23 @@ import java.util.Collections;
  * {@link Bound ParDo.Bound} primitive {@link PTransform}.
  */
 class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
-  private final LoadingCache<DoFn<?, ?>, ThreadLocal<DoFn<?, ?>>>
fnClones;
+  private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?,
?>>>
+      fnClones;
 
   public ParDoSingleEvaluatorFactory() {
-    fnClones = CacheBuilder.newBuilder()
-        .build(SerializableCloningThreadLocalCacheLoader.<DoFn<?, ?>>create());
+    fnClones =
+        CacheBuilder.newBuilder()
+            .build(
+                new CacheLoader<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?,
?>>>() {
+                  @Override
+                  public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?,
?, Bound<?, ?>> key)
+                      throws Exception {
+                    @SuppressWarnings({"unchecked", "rawtypes"})
+                    ThreadLocal threadLocal =
+                        (ThreadLocal) CloningThreadLocal.of(key.getTransform().getFn());
+                    return threadLocal;
+                  }
+                });
   }
 
   @Override
@@ -55,23 +68,26 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory
{
   }
 
   private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator(
-      @SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>,
-          Bound<InputT, OutputT>> application,
-      CommittedBundle<InputT> inputBundle, EvaluationContext evaluationContext) {
+      AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, Bound<InputT,
OutputT>>
+          application,
+      CommittedBundle<InputT> inputBundle,
+      EvaluationContext evaluationContext) {
     TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
 
-    @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InputT, OutputT>>
fnLocal =
-        (ThreadLocal) fnClones.getUnchecked(application.getTransform().getFn());
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    ThreadLocal<DoFn<InputT, OutputT>> fnLocal =
+        (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
     try {
-      ParDoEvaluator<InputT> parDoEvaluator = ParDoEvaluator.create(
-          evaluationContext,
-          inputBundle,
-          application,
-          fnLocal.get(),
-          application.getTransform().getSideInputs(),
-          mainOutputTag,
-          Collections.<TupleTag<?>>emptyList(),
-          ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
+      ParDoEvaluator<InputT> parDoEvaluator =
+          ParDoEvaluator.create(
+              evaluationContext,
+              inputBundle,
+              application,
+              fnLocal.get(),
+              application.getTransform().getSideInputs(),
+              mainOutputTag,
+              Collections.<TupleTag<?>>emptyList(),
+              ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag,
application.getOutput()));
       return ThreadLocalInvalidatingTransformEvaluator.wrapping(parDoEvaluator, fnLocal);
     } catch (Exception e) {
       fnLocal.remove();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoader.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoader.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoader.java
deleted file mode 100644
index a3703d9..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoader.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.util.SerializableUtils;
-
-import com.google.common.cache.CacheLoader;
-
-import java.io.Serializable;
-
-/**
- * A {@link CacheLoader} that loads {@link ThreadLocal ThreadLocals} with initial values
equal to
- * the clone of the key.
- */
-class SerializableCloningThreadLocalCacheLoader<T extends Serializable>
-    extends CacheLoader<T, ThreadLocal<T>> {
-  public static <T extends Serializable> CacheLoader<T, ThreadLocal<T>>
create() {
-    return new SerializableCloningThreadLocalCacheLoader<T>();
-  }
-
-  @Override
-  public ThreadLocal<T> load(T key) throws Exception {
-    return new CloningThreadLocal<>(key);
-  }
-
-  private static class CloningThreadLocal<T extends Serializable> extends ThreadLocal<T>
{
-    private final T original;
-
-    public CloningThreadLocal(T value) {
-      this.original = value;
-    }
-
-    @Override
-    public T initialValue() {
-      return SerializableUtils.clone(original);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningThreadLocalTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningThreadLocalTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningThreadLocalTest.java
new file mode 100644
index 0000000..298db46
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningThreadLocalTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.core.IsNot.not;
+import static org.hamcrest.core.IsSame.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link CloningThreadLocalTest}.
+ */
+@RunWith(JUnit4.class)
+public class CloningThreadLocalTest {
+  @Test
+  public void returnsCopiesOfOriginal() throws Exception {
+    Record original = new Record();
+    ThreadLocal<Record> loaded = CloningThreadLocal.of(original);
+    assertThat(loaded.get(), not(nullValue()));
+    assertThat(loaded.get(), equalTo(original));
+    assertThat(loaded.get(), not(theInstance(original)));
+  }
+
+  @Test
+  public void returnsDifferentCopiesInDifferentThreads() throws Exception {
+    Record original = new Record();
+    final ThreadLocal<Record> loaded = CloningThreadLocal.of(original);
+    assertThat(loaded.get(), not(nullValue()));
+    assertThat(loaded.get(), equalTo(original));
+    assertThat(loaded.get(), not(theInstance(original)));
+
+    Callable<Record> otherThread =
+        new Callable<Record>() {
+          @Override
+          public Record call() throws Exception {
+            return loaded.get();
+          }
+        };
+    Record sameThread = loaded.get();
+    Record firstOtherThread = Executors.newSingleThreadExecutor().submit(otherThread).get();
+    Record secondOtherThread = Executors.newSingleThreadExecutor().submit(otherThread).get();
+
+    assertThat(sameThread, equalTo(firstOtherThread));
+    assertThat(sameThread, equalTo(secondOtherThread));
+    assertThat(sameThread, not(theInstance(firstOtherThread)));
+    assertThat(sameThread, not(theInstance(secondOtherThread)));
+    assertThat(firstOtherThread, not(theInstance(secondOtherThread)));
+  }
+
+  private static class Record implements Serializable {
+    private final double rand = Math.random();
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof Record)) {
+        return false;
+      }
+      Record that = (Record) other;
+      return this.rand == that.rand;
+    }
+
+    @Override
+    public int hashCode() {
+      return 1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00195d25/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoaderTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoaderTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoaderTest.java
deleted file mode 100644
index c451eec..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SerializableCloningThreadLocalCacheLoaderTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.core.IsNot.not;
-import static org.hamcrest.core.IsSame.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-
-/**
- * Tests for {@link SerializableCloningThreadLocalCacheLoader}.
- */
-@RunWith(JUnit4.class)
-public class SerializableCloningThreadLocalCacheLoaderTest {
-  private SerializableCloningThreadLocalCacheLoader<Record> loader;
-
-  @Before
-  public void setup() {
-    loader = new SerializableCloningThreadLocalCacheLoader();
-  }
-
-  @Test
-  public void returnsCopiesOfOriginal() throws Exception {
-    Record original = new Record();
-    ThreadLocal<Record> loaded = loader.load(original);
-    assertThat(loaded.get(), not(nullValue()));
-    assertThat(loaded.get(), equalTo(original));
-    assertThat(loaded.get(), not(theInstance(original)));
-  }
-
-  @Test
-  public void returnsDifferentCopiesInDifferentThreads() throws Exception {
-    Record original = new Record();
-    final ThreadLocal<Record> loaded = loader.load(original);
-    assertThat(loaded.get(), not(nullValue()));
-    assertThat(loaded.get(), equalTo(original));
-    assertThat(loaded.get(), not(theInstance(original)));
-
-    Callable<Record> otherThread = new Callable<Record>() {
-      @Override
-      public Record call() throws Exception {
-        return loaded.get();
-      }
-    };
-    Record sameThread = loaded.get();
-    Record firstOtherThread = Executors.newSingleThreadExecutor().submit(otherThread).get();
-    Record secondOtherThread = Executors.newSingleThreadExecutor().submit(otherThread).get();
-
-    assertThat(sameThread, equalTo(firstOtherThread));
-    assertThat(sameThread, equalTo(secondOtherThread));
-    assertThat(sameThread, not(theInstance(firstOtherThread)));
-    assertThat(sameThread, not(theInstance(secondOtherThread)));
-    assertThat(firstOtherThread, not(theInstance(secondOtherThread)));
-  }
-
-  private static class Record implements Serializable {
-    private final double rand = Math.random();
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof Record)) {
-        return false;
-      }
-      Record that = (Record) other;
-      return this.rand == that.rand;
-    }
-
-    @Override
-    public int hashCode() {
-      return 1;
-    }
-  }
-}


Mime
View raw message