beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [3/4] beam git commit: jstorm-runner: support SourceMetrics.
Date Fri, 08 Sep 2017 06:43:55 GMT
jstorm-runner: support SourceMetrics.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fec423e5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fec423e5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fec423e5

Branch: refs/heads/jstorm-runner
Commit: fec423e51148d26495fb8e6d17ac204b161f3069
Parents: 0c38844
Author: Pei He <pei@apache.org>
Authored: Wed Sep 6 12:37:21 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Fri Sep 8 14:42:28 2017 +0800

----------------------------------------------------------------------
 runners/jstorm/pom.xml                          |  1 -
 .../translation/UnboundedSourceSpout.java       | 30 ++++++++++++++------
 2 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fec423e5/runners/jstorm/pom.xml
----------------------------------------------------------------------
diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml
index 681adb5..a433fcb 100644
--- a/runners/jstorm/pom.xml
+++ b/runners/jstorm/pom.xml
@@ -90,7 +90,6 @@
                   <excludedGroups>
                     org.apache.beam.sdk.testing.UsesSetState,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
-                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
                     org.apache.beam.sdk.testing.UsesTestStream
                   </excludedGroups>

http://git-wip-us.apache.org/repos/asf/beam/blob/fec423e5/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
index 73f1f0d..92d2f24 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
@@ -26,12 +26,14 @@ import backtype.storm.tuple.Values;
 import com.alibaba.jstorm.metric.MetricClient;
 import com.alibaba.jstorm.metrics.Gauge;
 import com.alibaba.jstorm.utils.KryoSerializer;
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -49,7 +51,7 @@ import org.slf4j.LoggerFactory;
 public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout {
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
 
-  private final String name;
+  private final String stepName;
   private final String description;
   private final UnboundedSource source;
   private final SerializedPipelineOptions serializedOptions;
@@ -58,6 +60,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
   private transient JStormPipelineOptions pipelineOptions;
   private transient UnboundedSource.UnboundedReader reader;
   private transient SpoutOutputCollector collector;
+  private transient MetricsReporter metricsReporter;
 
   private volatile boolean hasNextRecord;
   private AtomicBoolean activated = new AtomicBoolean();
@@ -67,12 +70,12 @@ public class UnboundedSourceSpout extends AbstractComponent implements
IRichSpou
   private long lastWaterMark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
 
   public UnboundedSourceSpout(
-      String name,
+      String stepName,
       String description,
       UnboundedSource source,
       JStormPipelineOptions options,
       TupleTag<?> outputTag) {
-    this.name = name;
+    this.stepName = checkNotNull(stepName, "stepName");
     this.description = checkNotNull(description, "description");
     this.source = checkNotNull(source, "source");
     this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
@@ -121,18 +124,23 @@ public class UnboundedSourceSpout extends AbstractComponent implements
IRichSpou
     this.pipelineOptions =
         this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
     this.serializer = new KryoSerializer<>(conf);
-    createSourceReader(null);
-    new MetricClient(context).registerGauge(
+    // init metrics
+    MetricClient metricClient = new MetricClient(context);
+    metricClient.registerGauge(
         context.getThisComponentId() + CommonInstance.BEAM_SOURCE_WATERMARK_METRICS,
         new Gauge<Double>() {
           @Override
           public Double getValue() {
             return (double) reader.getWatermark().getMillis();
           }});
+    metricsReporter = MetricsReporter.create(metricClient);
+
+    createSourceReader(null);
   }
 
   public synchronized void createSourceReader(UnboundedSource.CheckpointMark checkpointMark)
{
-    try {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
       if (reader != null) {
         reader.close();
       }
@@ -148,11 +156,13 @@ public class UnboundedSourceSpout extends AbstractComponent implements
IRichSpou
     if (!activated.get()) {
       return;
     }
-    try {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
       if (!hasNextRecord) {
         hasNextRecord = reader.advance();
       }
 
+      boolean emitted = false;
       while (hasNextRecord && activated.get()) {
         Object value = reader.getCurrent();
         Instant timestamp = reader.getCurrentTimestamp();
@@ -169,10 +179,14 @@ public class UnboundedSourceSpout extends AbstractComponent implements
IRichSpou
           byte[] immutableValue = serializer.serialize(wv);
           collector.emit(outputTag.getId(), new Values(immutableValue));
         }
+        emitted = true;
 
         // move to next record
         hasNextRecord = reader.advance();
       }
+      if (emitted) {
+        metricsReporter.updateMetrics();
+      }
 
       Instant waterMark = reader.getWatermark();
       if (waterMark != null && lastWaterMark < waterMark.getMillis()) {
@@ -187,7 +201,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements
IRichSpou
   }
 
   public String getName() {
-    return name;
+    return stepName;
   }
 
   public TupleTag getOutputTag() {


Mime
View raw message