beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [1/2] beam git commit: JStorm-runner: Performance improvement 1. remove some logs on critical path 2. register "TimestampedValue" in Kryo to reduce the serialized size of event value
Date Fri, 08 Sep 2017 06:27:42 GMT
Repository: beam
Updated Branches:
  refs/heads/jstorm-runner d24d2831d -> d2b285122


JStorm-runner: Performance improvement
1. remove some logs on critical path
2. register "TimestampedValue" in Kryo to reduce the serialized size of event value


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43492000
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43492000
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43492000

Branch: refs/heads/jstorm-runner
Commit: 43492000a49e81b6d9a2420148fb2df1735301b0
Parents: d24d283
Author: basti.lj <basti.lj@alibaba-inc.com>
Authored: Fri Sep 8 12:19:49 2017 +0800
Committer: basti.lj <basti.lj@alibaba-inc.com>
Committed: Fri Sep 8 12:19:49 2017 +0800

----------------------------------------------------------------------
 .../jstorm/serialization/BeamUtilsSerializer.java   |  2 ++
 .../runners/jstorm/translation/DoFnExecutor.java    |  3 +--
 .../runners/jstorm/translation/ExecutorsBolt.java   |  1 -
 .../jstorm/translation/WindowAssignExecutor.java    | 16 +++++++---------
 4 files changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java
index db1f037..8061a9f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Instant;
 
 /**
@@ -110,5 +111,6 @@ public class BeamUtilsSerializer {
         Lists.<BoundedWindow>newArrayList(w1), PaneInfo.NO_FIRING).getClass());
     config.registerSerialization(WindowedValue.of(null, Instant.now(),
         Lists.<BoundedWindow>newArrayList(w1, w2), PaneInfo.NO_FIRING).getClass());
+    config.registerSerialization(TimestampedValue.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
index 5425b6c..4b021a3 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -214,7 +214,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
   }
 
   protected <T> void processMainInput(WindowedValue<T> elem) {
-    LOG.debug(String.format("Main input: tag=%s, elem=%s", mainInputTag, elem));
     if (sideInputs.isEmpty()) {
       runner.processElement((WindowedValue<InputT>) elem);
     } else {
@@ -236,7 +235,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
   }
 
   protected void processSideInput(TupleTag tag, WindowedValue elem) {
-    LOG.debug(String.format("Side inputs: tag=%s, elem=%s.", tag, elem));
+    LOG.debug("Side inputs: tag={}, elem={}.", tag, elem);
 
     PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
     sideInputHandler.addSideInputValue(sideInputView, elem);

http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
index aca2ca4..1e9a4ff 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -295,7 +295,6 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt
{
 
   public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T>
elem) {
     if (elem != null) {
-      LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag);
       Executor executor = inputTagToExecutor.get(inputTag);
       if (executor != null) {
         executor.process(inputTag, elem);

http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
index 832c95c..ffbfb1b 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.jstorm.translation;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -46,13 +44,13 @@ class WindowAssignExecutor<T, W extends BoundedWindow> implements
Executor {
 
     JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value)
{
       fn.super();
-      checkArgument(
-          Iterables.size(value.getWindows()) == 1,
-          String.format(
-              "%s passed to window assignment must be in a single window, but it was in %s:
%s",
-              WindowedValue.class.getSimpleName(),
-              Iterables.size(value.getWindows()),
-              value.getWindows()));
+      if (value.getWindows().size() != 1) {
+        throw new IllegalArgumentException(String.format(
+            "%s passed to window assignment must be in a single window, but it was in %s:
%s",
+            WindowedValue.class.getSimpleName(),
+            Iterables.size(value.getWindows()),
+            value.getWindows()));
+      }
       this.value = value;
     }
 


Mime
View raw message