beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [beam] branch master updated: move MonitoringInfo protos to model/pipeline module
Date Wed, 27 Feb 2019 12:08:06 GMT
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cd76f28  move MonitoringInfo protos to model/pipeline module
     new 7206b9b  Merge pull request #7938: [BEAM-4775] move MonitoringInfo protos to model/pipeline module
cd76f28 is described below

commit cd76f28611670653c917e406306ce8870df0a073
Author: Ryan Williams <ryan.blake.williams@gmail.com>
AuthorDate: Sat Feb 23 06:33:34 2019 +0000

    move MonitoringInfo protos to model/pipeline module
---
 .../fn-execution/src/main/proto/beam_fn_api.proto  | 296 +------------------
 .../src/main/proto/beam_job_api.proto              |   1 -
 model/pipeline/src/main/proto/metrics.proto        | 324 +++++++++++++++++++++
 .../beam/runners/core/metrics/MetricUrns.java      |   2 +-
 .../runners/core/metrics/MetricsContainerImpl.java |   2 +-
 .../core/metrics/MetricsContainerStepMap.java      |   2 +-
 .../core/metrics/SimpleMonitoringInfoBuilder.java  |  22 +-
 .../runners/core/metrics/SimpleStateRegistry.java  |   2 +-
 .../core/metrics/SpecMonitoringInfoValidator.java  |  12 +-
 .../core/metrics/MetricsContainerImplTest.java     |   2 +-
 .../core/metrics/MetricsContainerStepMapTest.java  |   2 +-
 .../core/metrics/MonitoringInfoMatchers.java       |   2 +-
 .../core/metrics/MonitoringInfoTestUtil.java       |   2 +-
 .../metrics/SimpleMonitoringInfoBuilderTest.java   |   2 +-
 .../core/metrics/SimpleStateRegistryTest.java      |   2 +-
 .../metrics/SpecMonitoringInfoValidatorTest.java   |   2 +-
 .../flink/metrics/FlinkMetricContainer.java        |  12 +-
 .../flink/metrics/FlinkMetricContainerTest.java    |  20 +-
 .../worker/fn/control/BeamFnMapTaskExecutor.java   |   2 +-
 ...piMonitoringInfoToCounterUpdateTransformer.java |   2 +-
 ...ecMonitoringInfoToCounterUpdateTransformer.java |   2 +-
 .../MonitoringInfoToCounterUpdateTransformer.java  |   2 +-
 .../control/RegisterAndProcessBundleOperation.java |   2 +-
 .../dataflow/worker/fn/control/TimerReceiver.java  |   8 +-
 ...erMonitoringInfoToCounterUpdateTransformer.java |  13 +-
 .../fn/control/BeamFnMapTaskExecutorTest.java      |  95 +++---
 ...nitoringInfoToCounterUpdateTransformerTest.java |   2 +-
 ...nitoringInfoToCounterUpdateTransformerTest.java |   2 +-
 ...nitoringInfoToCounterUpdateTransformerTest.java |   2 +-
 .../fnexecution/control/RemoteExecutionTest.java   |   2 +-
 .../fn/harness/control/ProcessBundleHandler.java   |   2 +-
 .../harness/data/PCollectionConsumerRegistry.java  |   2 +-
 .../harness/data/PTransformFunctionRegistry.java   |   2 +-
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       |   2 +-
 .../data/ElementCountFnDataReceiverTest.java       |   2 +-
 sdks/python/apache_beam/metrics/cells.py           |  15 +-
 .../python/apache_beam/metrics/monitoring_infos.py |   6 +-
 .../python/apache_beam/options/pipeline_options.py |   1 +
 sdks/python/apache_beam/portability/common_urns.py |   6 +-
 39 files changed, 452 insertions(+), 429 deletions(-)

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 7ab693f..e96a8cc 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -43,6 +43,7 @@ import "endpoints.proto";
 import "google/protobuf/descriptor.proto";
 import "google/protobuf/timestamp.proto";
 import "google/protobuf/wrappers.proto";
+import "metrics.proto";
 
 /*
  * Constructs that define the pipeline shape.
@@ -254,7 +255,7 @@ message BundleApplication {
   // will use consume when providing a UI or for making scaling and performance
   // decisions. See https://s.apache.org/beam-bundles-backlog-splitting for
   // details about what types of signals may be useful to report.
-  repeated MonitoringInfo monitoring_infos = 7;
+  repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 7;
 }
 
 // An Application should be scheduled for execution after a delay.
@@ -293,7 +294,7 @@ message ProcessBundleResponse {
 
   // (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
-  repeated MonitoringInfo monitoring_infos = 3;
+  repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
 
   // (Optional) Specifies that the runner must callback to this worker
   // once the output of the bundle is committed. The Runner must send a
@@ -311,295 +312,6 @@ message ProcessBundleProgressRequest {
   string instruction_reference = 1;
 }
 
-// A specification containing required set of fields and labels required
-// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
-// ProcessBundleResponse reporting.
-message MonitoringInfoSpec {
-  string urn = 1;
-  string type_urn = 2;
-  // The list of required
-  repeated string required_labels = 3;
-  // Extra non functional parts of the spec for descriptive purposes.
-  // i.e. description, units, etc.
-  repeated Annotation annotations = 4;
-}
-
-// The key name and value string of MonitoringInfo annotations.
-message Annotation {
-  string key = 1;
-  string value = 2;
-}
-
-// Populated MonitoringInfoSpecs for specific URNs.
-// Indicating the required fields to be set.
-// SDKs and RunnerHarnesses can load these instances into memory and write a
-// validator or code generator to assist with populating and validating
-// MonitoringInfo protos.
-message MonitoringInfoSpecs {
-  enum Enum {
-    // TODO(ajamato): Add the PTRANSFORM name as a required label after
-    // upgrading the python SDK.
-    USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user:",
-      type_urn: "beam:metrics:sum_int_64",
-    }];
-
-    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
-      urn: "beam:metric:element_count:v1",
-      type_urn: "beam:metrics:sum_int_64",
-      required_labels: [ "PCOLLECTION" ],
-      annotations: [ {
-        key: "description",
-        value: "The total elements output to a Pcollection by a PTransform."
-      } ]
-    }];
-
-    START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = {
-      urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
-      type_urn: "beam:metrics:sum_int_64",
-      required_labels: [ "PTRANSFORM" ],
-      annotations: [ {
-        key: "description",
-        value: "The total estimated execution time of the start bundle"
-               "function in a pardo"
-      } ]
-    }];
-
-    PROCESS_BUNDLE_MSECS = 3 [(monitoring_info_spec) = {
-      urn: "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
-      type_urn: "beam:metrics:sum_int_64",
-      required_labels: [ "PTRANSFORM" ],
-      annotations: [ {
-        key: "description",
-        value: "The total estimated execution time of the process bundle"
-               "function in a pardo"
-      } ]
-    }];
-
-    FINISH_BUNDLE_MSECS = 4 [(monitoring_info_spec) = {
-      urn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
-      type_urn: "beam:metrics:sum_int_64",
-      required_labels: [ "PTRANSFORM" ],
-      annotations: [ {
-        key: "description",
-        value: "The total estimated execution time of the finish bundle "
-               "function in a pardo"
-      } ]
-    }];
-
-    TOTAL_MSECS = 5 [(monitoring_info_spec) = {
-      urn: "beam:metric:ptransform_execution_time:total_msecs:v1",
-      type_urn: "beam:metrics:sum_int_64",
-      required_labels: [ "PTRANSFORM" ],
-      annotations: [ {
-        key: "description",
-        value: "The total estimated execution time of the ptransform"
-      } ]
-    }];
-  }
-}
-
-// A set of properties for the MonitoringInfoLabel, this is useful to obtain
-// the proper label string for the MonitoringInfoLabel.
-message MonitoringInfoLabelProps {
-  // The label key to use in the MonitoringInfo labels map.
-  string name = 1;
-}
-
-// Enum extension to store MonitoringInfo related
-// specifications, constants, etc.
-extend google.protobuf.EnumValueOptions {
-  MonitoringInfoLabelProps label_props = 127337796;  // From: commit 0x7970544.
-
-  // Enum extension to store the MonitoringInfoSpecs.
-  MonitoringInfoSpec monitoring_info_spec = 207174266;
-}
-
-message MonitoringInfo {
-  // The name defining the metric or monitored state.
-  string urn = 1;
-
-  // This is specified as a URN that implies:
-  // A message class: (Distribution, Counter, Extrema, MonitoringDataTable).
-  // Sub types like field formats - int64, double, string.
-  // Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
-  // valid values are:
-  // beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
-  //     sum_double|latest_double|top_n_double|bottom_n_double|
-  //     distribution_int_64|distribution_double|monitoring_data_table
-  string type = 2;
-
-  // The Metric or monitored state.
-  oneof data {
-    MonitoringTableData monitoring_table_data = 3;
-    Metric metric = 4;
-  }
-
-  enum MonitoringInfoLabels {
-    // TODO(ajamato): Rename all references to TRANSFORM to PTRANSFORM
-    TRANSFORM = 0 [(label_props) = { name: "PTRANSFORM" }];
-    PCOLLECTION = 1 [(label_props) = { name: "PCOLLECTION" }];
-    WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }];
-    CODER = 3 [(label_props) = { name: "CODER" }];
-    ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
-  }
-  // A set of key+value labels which define the scope of the metric.
-  // Either a well defined entity id for matching the enum names in
-  // the MonitoringInfoLabels enum or any arbitrary label
-  // set by a custom metric or user metric.
-  // A monitoring system is expected to be able to aggregate the metrics
-  // together for all updates having the same URN and labels. Some systems such
-  // as Stackdriver will be able to aggregate the metrics using a subset of the
-  // provided labels
-  map<string, string> labels = 5;
-
-  // The walltime of the most recent update.
-  // Useful for aggregation for latest types such as LatestInt64.
-  google.protobuf.Timestamp timestamp = 6;
-}
-
-message MonitoringInfoUrns {
-  enum Enum {
-    // User counter have this format: 'beam:metric:user:<namespace>:<name>'.
-    USER_COUNTER_URN_PREFIX = 0
-        [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metric:user:"];
-
-    ELEMENT_COUNT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                           "beam:metric:element_count:v1"];
-
-    START_BUNDLE_MSECS = 2
-        [(org.apache.beam.model.pipeline.v1.beam_urn) =
-             "beam:metric:pardo_execution_time:start_bundle_msecs:v1"];
-
-    PROCESS_BUNDLE_MSECS = 3
-        [(org.apache.beam.model.pipeline.v1.beam_urn) =
-             "beam:metric:pardo_execution_time:process_bundle_msecs:v1"];
-
-    FINISH_BUNDLE_MSECS = 4
-        [(org.apache.beam.model.pipeline.v1.beam_urn) =
-             "beam:metric:pardo_execution_time:finish_bundle_msecs:v1"];
-
-    TOTAL_MSECS = 5
-        [(org.apache.beam.model.pipeline.v1.beam_urn) =
-             "beam:metric:ptransform_execution_time:total_msecs:v1"];
-  }
-}
-
-message MonitoringInfoTypeUrns {
-  enum Enum {
-    SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                            "beam:metrics:sum_int_64"];
-
-    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                                     "beam:metrics:distribution_int_64"];
-
-    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-                               "beam:metrics:latest_int_64"];
-  }
-}
-
-message Metric {
-  // (Required) The data for this metric.
-  oneof data {
-    CounterData counter_data = 1;
-    DistributionData distribution_data = 2;
-    ExtremaData extrema_data = 3;
-  }
-}
-
-// Data associated with a Counter or Gauge metric.
-// This is designed to be compatible with metric collection
-// systems such as DropWizard.
-message CounterData {
-  oneof value {
-    int64 int64_value = 1;
-    double double_value = 2;
-    string string_value = 3;
-  }
-}
-
-// Extrema messages are used for calculating
-// Top-N/Bottom-N metrics.
-message ExtremaData {
-  oneof extrema {
-    IntExtremaData int_extrema_data = 1;
-    DoubleExtremaData double_extrema_data = 2;
-  }
-}
-
-message IntExtremaData {
-  repeated int64 int_values = 1;
-}
-
-message DoubleExtremaData {
-  repeated double double_values = 2;
-}
-
-// Data associated with a distribution metric.
-// This is based off of the current DistributionData metric.
-// This is not a stackdriver or dropwizard compatible
-// style of distribution metric.
-message DistributionData {
-  oneof distribution {
-    IntDistributionData int_distribution_data = 1;
-    DoubleDistributionData double_distribution_data = 2;
-  }
-}
-
-message IntDistributionData {
-  int64 count = 1;
-  int64 sum = 2;
-  int64 min = 3;
-  int64 max = 4;
-}
-
-message DoubleDistributionData {
-  int64 count = 1;
-  double sum = 2;
-  double min = 3;
-  double max = 4;
-}
-
-// General MonitoredState information which contains
-// structured information which does not fit into a typical
-// metric format. For example, a table of important files
-// and metadata which an I/O source is reading.
-// Note: Since MonitoredState is designed to be
-// customizable, and allow engines to aggregate these
-// metrics in custom ways.
-// Engines can use custom aggregation functions for specific URNs
-// by inspecting the column values.
-// An SDK should always report its current state, that is all
-// relevant MonitoredState for its PTransform at the current moment
-// and this should be kept small.
-// For example, an SDK can emit the oldest three files which
-// have been waiting for data for over 1 hour.
-// If an engine supports the URN with a custom aggregation then
-// it can filter these and keep only the Top-3 rows based on
-// how long the files have been waiting for data.
-// Otherwise an engine can ignore the MonitoringTableData
-// or union all the rows together into one large table and display
-// them in a UI.
-message MonitoringTableData {
-  message MonitoringColumnValue {
-    oneof value {
-      int64 int64_value = 1;
-      double double_value = 2;
-      string string_value = 3;
-      google.protobuf.Timestamp timestamp = 4;
-    }
-  }
-
-  message MonitoringRow {
-    repeated MonitoringColumnValue values = 1;
-  }
-
-  // The number of column names must match the
-  // number of values in each MonitoringRow.
-  repeated string column_names = 1;
-  repeated MonitoringRow row_data = 2;
-}
-
 // DEPRECATED
 message Metrics {
   // PTransform level metrics.
@@ -723,7 +435,7 @@ message ProcessBundleProgressResponse {
 
   // (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
-  repeated MonitoringInfo monitoring_infos = 3;
+  repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
 
   // The list of currently active primary roots that are being
   // executed. Required to be populated for PTransforms which can be split.
diff --git a/model/job-management/src/main/proto/beam_job_api.proto b/model/job-management/src/main/proto/beam_job_api.proto
index cd2ddff..4a0ecea 100644
--- a/model/job-management/src/main/proto/beam_job_api.proto
+++ b/model/job-management/src/main/proto/beam_job_api.proto
@@ -33,7 +33,6 @@ import "beam_runner_api.proto";
 import "endpoints.proto";
 import "google/protobuf/struct.proto";
 
-
 // Job Service for running RunnerAPI pipelines
 service JobService {
   // Prepare a job for execution. The job will not be executed until a call is made to run with the
diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto
new file mode 100644
index 0000000..822c992
--- /dev/null
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers for metrics classes, used in the Fn API, Job API, and by SDKs.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = "pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "MetricsApi";
+
+
+import "beam_runner_api.proto";
+import "google/protobuf/descriptor.proto";
+import "google/protobuf/timestamp.proto";
+
+// A specification containing required set of fields and labels required
+// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
+// ProcessBundleResponse reporting.
+message MonitoringInfoSpec {
+  string urn = 1;
+  string type_urn = 2;
+  // The list of required
+  repeated string required_labels = 3;
+  // Extra non functional parts of the spec for descriptive purposes.
+  // i.e. description, units, etc.
+  repeated Annotation annotations = 4;
+}
+
+// The key name and value string of MonitoringInfo annotations.
+message Annotation {
+  string key = 1;
+  string value = 2;
+}
+
+// Populated MonitoringInfoSpecs for specific URNs.
+// Indicating the required fields to be set.
+// SDKs and RunnerHarnesses can load these instances into memory and write a
+// validator or code generator to assist with populating and validating
+// MonitoringInfo protos.
+message MonitoringInfoSpecs {
+  enum Enum {
+    // TODO(ajamato): Add the PTRANSFORM name as a required label after
+    // upgrading the python SDK.
+    USER_COUNTER = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:",
+      type_urn: "beam:metrics:sum_int_64",
+    }];
+
+    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:element_count:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PCOLLECTION" ],
+      annotations: [ {
+        key: "description",
+        value: "The total elements output to a Pcollection by a PTransform."
+      } ]
+    }];
+
+    START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = {
+      urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [ {
+        key: "description",
+        value: "The total estimated execution time of the start bundle"
+               "function in a pardo"
+      } ]
+    }];
+
+    PROCESS_BUNDLE_MSECS = 3 [(monitoring_info_spec) = {
+      urn: "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [ {
+        key: "description",
+        value: "The total estimated execution time of the process bundle"
+               "function in a pardo"
+      } ]
+    }];
+
+    FINISH_BUNDLE_MSECS = 4 [(monitoring_info_spec) = {
+      urn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [ {
+        key: "description",
+        value: "The total estimated execution time of the finish bundle "
+               "function in a pardo"
+      } ]
+    }];
+
+    TOTAL_MSECS = 5 [(monitoring_info_spec) = {
+      urn: "beam:metric:ptransform_execution_time:total_msecs:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [ {
+        key: "description",
+        value: "The total estimated execution time of the ptransform"
+      } ]
+    }];
+  }
+}
+
+// A set of properties for the MonitoringInfoLabel, this is useful to obtain
+// the proper label string for the MonitoringInfoLabel.
+message MonitoringInfoLabelProps {
+  // The label key to use in the MonitoringInfo labels map.
+  string name = 1;
+}
+
+// Enum extension to store MonitoringInfo related
+// specifications, constants, etc.
+extend google.protobuf.EnumValueOptions {
+  MonitoringInfoLabelProps label_props = 127337796;  // From: commit 0x7970544.
+
+  // Enum extension to store the MonitoringInfoSpecs.
+  MonitoringInfoSpec monitoring_info_spec = 207174266;
+}
+
+message MonitoringInfo {
+  // The name defining the metric or monitored state.
+  string urn = 1;
+
+  // This is specified as a URN that implies:
+  // A message class: (Distribution, Counter, Extrema, MonitoringDataTable).
+  // Sub types like field formats - int64, double, string.
+  // Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
+  // valid values are:
+  // beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
+  //     sum_double|latest_double|top_n_double|bottom_n_double|
+  //     distribution_int_64|distribution_double|monitoring_data_table
+  string type = 2;
+
+  // The Metric or monitored state.
+  oneof data {
+    MonitoringTableData monitoring_table_data = 3;
+    Metric metric = 4;
+  }
+
+  enum MonitoringInfoLabels {
+    // TODO(ajamato): Rename all references to TRANSFORM to PTRANSFORM
+    TRANSFORM = 0 [(label_props) = { name: "PTRANSFORM" }];
+    PCOLLECTION = 1 [(label_props) = { name: "PCOLLECTION" }];
+    WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }];
+    CODER = 3 [(label_props) = { name: "CODER" }];
+    ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
+  }
+  // A set of key+value labels which define the scope of the metric.
+  // Either a well defined entity id for matching the enum names in
+  // the MonitoringInfoLabels enum or any arbitrary label
+  // set by a custom metric or user metric.
+  // A monitoring system is expected to be able to aggregate the metrics
+  // together for all updates having the same URN and labels. Some systems such
+  // as Stackdriver will be able to aggregate the metrics using a subset of the
+  // provided labels
+  map<string, string> labels = 5;
+
+  // The walltime of the most recent update.
+  // Useful for aggregation for latest types such as LatestInt64.
+  google.protobuf.Timestamp timestamp = 6;
+}
+
+message MonitoringInfoUrns {
+  enum Enum {
+    // User counter have this format: 'beam:metric:user:<namespace>:<name>'.
+    USER_COUNTER_URN_PREFIX = 0
+        [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metric:user:"];
+
+    ELEMENT_COUNT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                           "beam:metric:element_count:v1"];
+
+    START_BUNDLE_MSECS = 2
+        [(org.apache.beam.model.pipeline.v1.beam_urn) =
+             "beam:metric:pardo_execution_time:start_bundle_msecs:v1"];
+
+    PROCESS_BUNDLE_MSECS = 3
+        [(org.apache.beam.model.pipeline.v1.beam_urn) =
+             "beam:metric:pardo_execution_time:process_bundle_msecs:v1"];
+
+    FINISH_BUNDLE_MSECS = 4
+        [(org.apache.beam.model.pipeline.v1.beam_urn) =
+             "beam:metric:pardo_execution_time:finish_bundle_msecs:v1"];
+
+    TOTAL_MSECS = 5
+        [(org.apache.beam.model.pipeline.v1.beam_urn) =
+             "beam:metric:ptransform_execution_time:total_msecs:v1"];
+  }
+}
+
+message MonitoringInfoTypeUrns {
+  enum Enum {
+    SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                            "beam:metrics:sum_int_64"];
+
+    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                                     "beam:metrics:distribution_int_64"];
+
+    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+                               "beam:metrics:latest_int_64"];
+  }
+}
+
+message Metric {
+  // (Required) The data for this metric.
+  oneof data {
+    CounterData counter_data = 1;
+    DistributionData distribution_data = 2;
+    ExtremaData extrema_data = 3;
+  }
+}
+
+// Data associated with a Counter or Gauge metric.
+// This is designed to be compatible with metric collection
+// systems such as DropWizard.
+message CounterData {
+  oneof value {
+    int64 int64_value = 1;
+    double double_value = 2;
+    string string_value = 3;
+  }
+}
+
+// Extrema messages are used for calculating
+// Top-N/Bottom-N metrics.
+message ExtremaData {
+  oneof extrema {
+    IntExtremaData int_extrema_data = 1;
+    DoubleExtremaData double_extrema_data = 2;
+  }
+}
+
+message IntExtremaData {
+  repeated int64 int_values = 1;
+}
+
+message DoubleExtremaData {
+  repeated double double_values = 2;
+}
+
+// Data associated with a distribution metric.
+// This is based off of the current DistributionData metric.
+// This is not a stackdriver or dropwizard compatible
+// style of distribution metric.
+message DistributionData {
+  oneof distribution {
+    IntDistributionData int_distribution_data = 1;
+    DoubleDistributionData double_distribution_data = 2;
+  }
+}
+
+message IntDistributionData {
+  int64 count = 1;
+  int64 sum = 2;
+  int64 min = 3;
+  int64 max = 4;
+}
+
+message DoubleDistributionData {
+  int64 count = 1;
+  double sum = 2;
+  double min = 3;
+  double max = 4;
+}
+
+// General MonitoredState information which contains
+// structured information which does not fit into a typical
+// metric format. For example, a table of important files
+// and metadata which an I/O source is reading.
+// Note: Since MonitoredState is designed to be
+// customizable, and allow engines to aggregate these
+// metrics in custom ways.
+// Engines can use custom aggregation functions for specific URNs
+// by inspecting the column values.
+// An SDK should always report its current state, that is all
+// relevant MonitoredState for its PTransform at the current moment
+// and this should be kept small.
+// For example, an SDK can emit the oldest three files which
+// have been waiting for data for over 1 hour.
+// If an engine supports the URN with a custom aggregation then
+// it can filter these and keep only the Top-3 rows based on
+// how long the files have been waiting for data.
+// Otherwise an engine can ignore the MonitoringTableData
+// or union all the rows together into one large table and display
+// them in a UI.
+message MonitoringTableData {
+  message MonitoringColumnValue {
+    oneof value {
+      int64 int64_value = 1;
+      double double_value = 2;
+      string string_value = 3;
+      google.protobuf.Timestamp timestamp = 4;
+    }
+  }
+
+  message MonitoringRow {
+    repeated MonitoringColumnValue values = 1;
+  }
+
+  // The number of column names must match the
+  // number of values in each MonitoringRow.
+  repeated string column_names = 1;
+  repeated MonitoringRow row_data = 2;
+}
+
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
index 6b40603..ba47812 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.metrics.MetricName;
 public class MetricUrns {
   /**
    * Parse a {@link MetricName} from a {@link
-   * org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum}.
+   * org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoUrns.Enum}.
    *
    * <p>Should be consistent with {@code parse_namespace_and_name} in monitoring_infos.py.
    */
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 3f81827..68fd86d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.Map.Entry;
 import javax.annotation.Nullable;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.construction.metrics.MetricKey;
 import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.annotations.Experimental;
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
index 6480a9d..1e46ebb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.construction.metrics.MetricKey;
 import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.metrics.MetricResult;
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
index c39f15d..92893d9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
@@ -17,17 +17,19 @@
  */
 package org.apache.beam.runners.core.metrics;
 
+import static org.apache.beam.model.pipeline.v1.MetricsApi.labelProps;
+import static org.apache.beam.model.pipeline.v1.MetricsApi.monitoringInfoSpec;
+
 import java.time.Instant;
 import java.util.HashMap;
 import javax.annotation.Nullable;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo.MonitoringInfoLabels;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoLabelProps;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpec;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoTypeUrns;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo.MonitoringInfoLabels;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoLabelProps;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpec;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoTypeUrns;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoUrns;
 import org.apache.beam.runners.core.construction.BeamUrns;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -91,7 +93,7 @@ public class SimpleMonitoringInfoBuilder {
       // the proto files.
       if (!val.name().equals("UNRECOGNIZED")) {
         MonitoringInfoSpec spec =
-            val.getValueDescriptor().getOptions().getExtension(BeamFnApi.monitoringInfoSpec);
+            val.getValueDescriptor().getOptions().getExtension(monitoringInfoSpec);
         SimpleMonitoringInfoBuilder.specs.put(spec.getUrn(), spec);
       }
     }
@@ -100,7 +102,7 @@ public class SimpleMonitoringInfoBuilder {
   /** Returns the label string constant defined in the MonitoringInfoLabel enum proto. */
   private static String getLabelString(MonitoringInfoLabels label) {
     MonitoringInfoLabelProps props =
-        label.getValueDescriptor().getOptions().getExtension(BeamFnApi.labelProps);
+        label.getValueDescriptor().getOptions().getExtension(labelProps);
     return props.getName();
   }
 
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleStateRegistry.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleStateRegistry.java
index 224d6e2..19f2791 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleStateRegistry.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleStateRegistry.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.core.metrics;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 
 /**
  * A Class for registering SimpleExecutionStates with and extracting execution time MonitoringInfos.
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
index b0acbc8..8ac8bd6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
@@ -17,14 +17,15 @@
  */
 package org.apache.beam.runners.core.metrics;
 
+import static org.apache.beam.model.pipeline.v1.MetricsApi.monitoringInfoSpec;
+
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpec;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpec;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs;
 
 /** Class implements validation of MonitoringInfos against MonitoringInfoSpecs. */
 public class SpecMonitoringInfoValidator {
@@ -35,8 +36,7 @@ public class SpecMonitoringInfoValidator {
         Arrays.stream(MonitoringInfoSpecs.Enum.values())
             // Filtering default value for "unknown" Enums. Coming from proto implementation.
             .filter(x -> !x.name().equals("UNRECOGNIZED"))
-            .map(
-                x -> x.getValueDescriptor().getOptions().getExtension(BeamFnApi.monitoringInfoSpec))
+            .map(x -> x.getValueDescriptor().getOptions().getExtension(monitoringInfoSpec))
             .toArray(size -> new MonitoringInfoSpec[size]);
   }
 
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 0636167..aa74ff2 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
index 901ab25..bec18ae 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
@@ -28,7 +28,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.DistributionResult;
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
index 62fae47..f3455df 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMatchers.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.core.metrics;
 
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeMatcher;
 
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
index ed64f76..1b337ee 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.core.metrics;
 
 import java.util.HashMap;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 
 /**
  * Provides convenient one line factories for unit tests that need to generate test MonitoringInfos.
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
index 14fa703..18e7829 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
@@ -21,7 +21,7 @@ import static junit.framework.TestCase.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
index 3e2671d..ac42caf 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertThat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.junit.Test;
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
index 7cff0a0..bffcd8a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.core.metrics;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index 23d1f83..93f4c2b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -23,12 +23,12 @@ import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAtt
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.CounterData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.DistributionData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.ExtremaData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.IntDistributionData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.Distribution;
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
index 6b9435e..ade8960 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
-import static org.apache.beam.model.fnexecution.v1.BeamFnApi.labelProps;
+import static org.apache.beam.model.pipeline.v1.MetricsApi.labelProps;
 import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN;
 import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX;
 import static org.hamcrest.CoreMatchers.is;
@@ -30,13 +30,13 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.CounterData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.DoubleDistributionData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.IntDistributionData;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo.MonitoringInfoLabels;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.DoubleDistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo.MonitoringInfoLabels;
 import org.apache.beam.runners.core.metrics.CounterCell;
 import org.apache.beam.runners.core.metrics.DistributionCell;
 import org.apache.beam.runners.core.metrics.DistributionData;
@@ -193,7 +193,7 @@ public class FlinkMetricContainerTest {
             .setMetric(
                 Metric.newBuilder()
                     .setDistributionData(
-                        BeamFnApi.DistributionData.newBuilder()
+                        MetricsApi.DistributionData.newBuilder()
                             .setIntDistributionData(
                                 IntDistributionData.newBuilder()
                                     .setSum(30)
@@ -209,7 +209,7 @@ public class FlinkMetricContainerTest {
             .setMetric(
                 Metric.newBuilder()
                     .setDistributionData(
-                        BeamFnApi.DistributionData.newBuilder()
+                        MetricsApi.DistributionData.newBuilder()
                             .setDoubleDistributionData(
                                 DoubleDistributionData.newBuilder()
                                     .setSum(30)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index bff2d21..1da0af8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -43,8 +43,8 @@ import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metrics;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.construction.metrics.MetricKey;
 import org.apache.beam.runners.core.metrics.DistributionData;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
index 8cc0ec9..8990e3c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.dataflow.worker.fn.control;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java
index 45cd65b..2a765b1 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java
@@ -24,7 +24,7 @@ import com.google.api.services.dataflow.model.CounterUpdate;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java
index 974fc96..024aa95 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.dataflow.worker.fn.control;
 
 import com.google.api.services.dataflow.model.CounterUpdate;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 
 interface MonitoringInfoToCounterUpdateTransformer {
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index ebaa5d8..6518555 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -32,7 +32,6 @@ import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
@@ -44,6 +43,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.StateNamespaces;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
index 8d41a6d..7f31c48 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
@@ -78,7 +78,7 @@ public class TimerReceiver {
     ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor =
         stageBundleFactory.getProcessBundleDescriptor();
 
-    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+    ProcessBundleDescriptor processBundleDescriptor =
         executableProcessBundleDescriptor.getProcessBundleDescriptor();
 
     // Create and cache lookups so that we don't have to dive into the ProcessBundleDescriptor
@@ -203,7 +203,7 @@ public class TimerReceiver {
 
   // Retrieves all window coders for all TimerSpecs.
   private static Map<String, Coder<BoundedWindow>> createTimerWindowCodersMap(
-      BeamFnApi.ProcessBundleDescriptor processBundleDescriptor,
+      ProcessBundleDescriptor processBundleDescriptor,
       Map<String, ProcessBundleDescriptors.TimerSpec> timerIdToTimerSpecMap,
       RunnerApi.Components components) {
     Map<String, Coder<BoundedWindow>> timerWindowCodersMap = new HashMap<>();
@@ -242,7 +242,7 @@ public class TimerReceiver {
   private static RunnerApi.Coder getTimerWindowingCoder(
       RunnerApi.PTransform pTransform,
       String timerId,
-      BeamFnApi.ProcessBundleDescriptor processBundleDescriptor) {
+      ProcessBundleDescriptor processBundleDescriptor) {
     String timerPCollectionId = pTransform.getInputsMap().get(timerId);
     RunnerApi.PCollection timerPCollection =
         processBundleDescriptor.getPcollectionsMap().get(timerPCollectionId);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
index b3c9353..cb65456 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
@@ -17,15 +17,16 @@
  */
 package org.apache.beam.runners.dataflow.worker.fn.control;
 
+import static org.apache.beam.model.pipeline.v1.MetricsApi.monitoringInfoSpec;
+
 import com.google.api.services.dataflow.model.CounterMetadata;
 import com.google.api.services.dataflow.model.CounterStructuredName;
 import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs.Enum;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs.Enum;
 import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin;
@@ -54,11 +55,7 @@ class UserMonitoringInfoToCounterUpdateTransformer
   }
 
   static final String BEAM_METRICS_USER_PREFIX =
-      Enum.USER_COUNTER
-          .getValueDescriptor()
-          .getOptions()
-          .getExtension(BeamFnApi.monitoringInfoSpec)
-          .getUrn();
+      Enum.USER_COUNTER.getValueDescriptor().getOptions().getExtension(monitoringInfoSpec).getUrn();
 
   private Optional<String> validate(MonitoringInfo monitoringInfo) {
     Optional<String> validatorResult = specValidator.validate(monitoringInfo);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index f38d31f..02ac1ba 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -36,6 +36,10 @@ import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metrics;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
@@ -92,11 +96,11 @@ public class BeamFnMapTaskExecutorTest {
   private static final String FAKE_OUTPUT_NAME = "fake_output_name";
   private static final String FAKE_OUTPUT_PCOLLECTION_ID = "fake_pcollection_id";
 
-  private static final BeamFnApi.Metrics.PTransform FAKE_ELEMENT_COUNT_METRICS =
-      BeamFnApi.Metrics.PTransform.newBuilder()
+  private static final Metrics.PTransform FAKE_ELEMENT_COUNT_METRICS =
+      Metrics.PTransform.newBuilder()
           .setProcessedElements(
-              BeamFnApi.Metrics.PTransform.ProcessedElements.newBuilder()
-                  .setMeasured(BeamFnApi.Metrics.PTransform.Measured.getDefaultInstance()))
+              Metrics.PTransform.ProcessedElements.newBuilder()
+                  .setMeasured(Metrics.PTransform.Measured.getDefaultInstance()))
           .build();
 
   private static final BeamFnApi.RegisterRequest REGISTER_REQUEST =
@@ -122,11 +126,8 @@ public class BeamFnMapTaskExecutorTest {
     final CountDownLatch progressSentLatch = new CountDownLatch(1);
     final CountDownLatch processBundleLatch = new CountDownLatch(1);
 
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
+    final Metrics.User.MetricName metricName =
+        Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
 
     InstructionRequestHandler instructionRequestHandler =
         new InstructionRequestHandler() {
@@ -150,17 +151,16 @@ public class BeamFnMapTaskExecutorTest {
                         .setProcessBundleProgress(
                             BeamFnApi.ProcessBundleProgressResponse.newBuilder()
                                 .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
+                                    Metrics.newBuilder()
                                         .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
                                         .putPtransforms(
                                             stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
+                                            Metrics.PTransform.newBuilder()
                                                 .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
+                                                    Metrics.User.newBuilder()
                                                         .setMetricName(metricName)
                                                         .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
+                                                            Metrics.User.CounterData.newBuilder()
                                                                 .setValue(counterValue)))
                                                 .build())))
                         .build());
@@ -222,11 +222,8 @@ public class BeamFnMapTaskExecutorTest {
     final CountDownLatch progressSentTwiceLatch = new CountDownLatch(2);
     final CountDownLatch processBundleLatch = new CountDownLatch(1);
 
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
+    final Metrics.User.MetricName metricName =
+        Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
 
     InstructionRequestHandler instructionRequestHandler =
         new InstructionRequestHandler() {
@@ -250,17 +247,16 @@ public class BeamFnMapTaskExecutorTest {
                         .setProcessBundleProgress(
                             BeamFnApi.ProcessBundleProgressResponse.newBuilder()
                                 .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
+                                    Metrics.newBuilder()
                                         .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
                                         .putPtransforms(
                                             stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
+                                            Metrics.PTransform.newBuilder()
                                                 .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
+                                                    Metrics.User.newBuilder()
                                                         .setMetricName(metricName)
                                                         .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
+                                                            Metrics.User.CounterData.newBuilder()
                                                                 .setValue(
                                                                     progressSentTwiceLatch
                                                                                 .getCount()
@@ -328,11 +324,8 @@ public class BeamFnMapTaskExecutorTest {
     final CountDownLatch progressSentLatch = new CountDownLatch(1);
     final CountDownLatch processBundleLatch = new CountDownLatch(1);
 
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
+    final Metrics.User.MetricName metricName =
+        Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
 
     InstructionRequestHandler instructionRequestHandler =
         new InstructionRequestHandler() {
@@ -349,17 +342,16 @@ public class BeamFnMapTaskExecutorTest {
                           .setProcessBundle(
                               BeamFnApi.ProcessBundleResponse.newBuilder()
                                   .setMetrics(
-                                      BeamFnApi.Metrics.newBuilder()
+                                      Metrics.newBuilder()
                                           .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
                                           .putPtransforms(
                                               stepName,
-                                              BeamFnApi.Metrics.PTransform.newBuilder()
+                                              Metrics.PTransform.newBuilder()
                                                   .addUser(
-                                                      BeamFnApi.Metrics.User.newBuilder()
+                                                      Metrics.User.newBuilder()
                                                           .setMetricName(metricName)
                                                           .setCounterData(
-                                                              BeamFnApi.Metrics.User.CounterData
-                                                                  .newBuilder()
+                                                              Metrics.User.CounterData.newBuilder()
                                                                   .setValue(finalCounterValue)))
                                                   .build())))
                           .build();
@@ -371,17 +363,16 @@ public class BeamFnMapTaskExecutorTest {
                         .setProcessBundleProgress(
                             BeamFnApi.ProcessBundleProgressResponse.newBuilder()
                                 .setMetrics(
-                                    BeamFnApi.Metrics.newBuilder()
+                                    Metrics.newBuilder()
                                         .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
                                         .putPtransforms(
                                             stepName,
-                                            BeamFnApi.Metrics.PTransform.newBuilder()
+                                            Metrics.PTransform.newBuilder()
                                                 .addUser(
-                                                    BeamFnApi.Metrics.User.newBuilder()
+                                                    Metrics.User.newBuilder()
                                                         .setMetricName(metricName)
                                                         .setCounterData(
-                                                            BeamFnApi.Metrics.User.CounterData
-                                                                .newBuilder()
+                                                            Metrics.User.CounterData.newBuilder()
                                                                 .setValue(counterValue)))
                                                 .build())))
                         .build());
@@ -447,37 +438,33 @@ public class BeamFnMapTaskExecutorTest {
     final CountDownLatch progressSentLatch = new CountDownLatch(1);
     final CountDownLatch processBundleLatch = new CountDownLatch(1);
 
-    final BeamFnApi.Metrics.User.MetricName metricName =
-        BeamFnApi.Metrics.User.MetricName.newBuilder()
-            .setNamespace(namespace)
-            .setName(name)
-            .build();
+    final Metrics.User.MetricName metricName =
+        Metrics.User.MetricName.newBuilder().setNamespace(namespace).setName(name).build();
 
-    final BeamFnApi.Metrics deprecatedMetrics =
-        BeamFnApi.Metrics.newBuilder()
+    final Metrics deprecatedMetrics =
+        Metrics.newBuilder()
             .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
             .putPtransforms(
                 stepName,
-                BeamFnApi.Metrics.PTransform.newBuilder()
+                Metrics.PTransform.newBuilder()
                     .addUser(
-                        BeamFnApi.Metrics.User.newBuilder()
+                        Metrics.User.newBuilder()
                             .setMetricName(metricName)
                             .setCounterData(
-                                BeamFnApi.Metrics.User.CounterData.newBuilder()
-                                    .setValue(finalCounterValue)))
+                                Metrics.User.CounterData.newBuilder().setValue(finalCounterValue)))
                     .build())
             .build();
 
     final int expectedCounterValue = 5;
-    final BeamFnApi.MonitoringInfo expectedMonitoringInfo =
-        BeamFnApi.MonitoringInfo.newBuilder()
+    final MonitoringInfo expectedMonitoringInfo =
+        MonitoringInfo.newBuilder()
             .setUrn("beam:metric:user:ExpectedCounter")
             .setType("beam:metrics:sum_int_64")
             .putLabels("PTRANSFORM", "ExpectedPTransform")
             .setMetric(
-                BeamFnApi.Metric.newBuilder()
+                Metric.newBuilder()
                     .setCounterData(
-                        BeamFnApi.CounterData.newBuilder()
+                        MetricsApi.CounterData.newBuilder()
                             .setInt64Value(expectedCounterValue)
                             .build())
                     .build())
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
index 44a526a..9d96ae2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java
@@ -25,7 +25,7 @@ import com.google.api.services.dataflow.model.CounterUpdate;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java
index fc74c0f..c221cee 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java
@@ -27,7 +27,7 @@ import com.google.api.services.dataflow.model.CounterUpdate;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
index 5e5b10c..a401c90 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
@@ -27,7 +27,7 @@ import com.google.api.services.dataflow.model.CounterUpdate;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 30c5b90..e32cd79 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -43,10 +43,10 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.FnHarness;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index f54b8fd..038c45b 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -39,7 +39,6 @@ import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
@@ -47,6 +46,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.Builder;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 3c6ea15..7bee753 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PTransformFunctionRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PTransformFunctionRegistry.java
index 9c6a2f2..6b20fe5 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PTransformFunctionRegistry.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PTransformFunctionRegistry.java
@@ -21,7 +21,7 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index c5c5097..d10e993 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -37,8 +37,8 @@ import java.util.ServiceLoader;
 import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
 import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
 import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
index d88405a..5490495 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.withSettings;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index 65ca185..6dbc1af 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -37,6 +37,7 @@ from apache_beam.metrics.metricbase import Counter
 from apache_beam.metrics.metricbase import Distribution
 from apache_beam.metrics.metricbase import Gauge
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import metrics_pb2
 
 __all__ = ['DistributionResult', 'GaugeResult']
 
@@ -171,8 +172,8 @@ class CounterCell(Counter, MetricCell):
     # and Distributions. Since there is no CounterData class this method
     # was added to CounterCell. Consider adding a CounterData class or
     # removing the GaugeData and DistributionData classes.
-    return beam_fn_api_pb2.Metric(
-        counter_data=beam_fn_api_pb2.CounterData(
+    return metrics_pb2.Metric(
+        counter_data=metrics_pb2.CounterData(
             int64_value=self.get_cumulative()
         )
     )
@@ -404,8 +405,8 @@ class GaugeData(object):
 
   def to_runner_api_monitoring_info(self):
     """Returns a Metric with this value for use in a MonitoringInfo."""
-    return beam_fn_api_pb2.Metric(
-        counter_data=beam_fn_api_pb2.CounterData(
+    return metrics_pb2.Metric(
+        counter_data=metrics_pb2.CounterData(
             int64_value=self.value
         )
     )
@@ -478,9 +479,9 @@ class DistributionData(object):
 
   def to_runner_api_monitoring_info(self):
     """Returns a Metric with this value for use in a MonitoringInfo."""
-    return beam_fn_api_pb2.Metric(
-        distribution_data=beam_fn_api_pb2.DistributionData(
-            int_distribution_data=beam_fn_api_pb2.IntDistributionData(
+    return metrics_pb2.Metric(
+        distribution_data=metrics_pb2.DistributionData(
+            int_distribution_data=metrics_pb2.IntDistributionData(
                 count=self.count, sum=self.sum, min=self.min, max=self.max)))
 
 
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 94106c6..2e9059f 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -31,9 +31,9 @@ from apache_beam.metrics.cells import DistributionResult
 from apache_beam.metrics.cells import GaugeData
 from apache_beam.metrics.cells import GaugeResult
 from apache_beam.portability import common_urns
-from apache_beam.portability.api.beam_fn_api_pb2 import CounterData
-from apache_beam.portability.api.beam_fn_api_pb2 import Metric
-from apache_beam.portability.api.beam_fn_api_pb2 import MonitoringInfo
+from apache_beam.portability.api.metrics_pb2 import CounterData
+from apache_beam.portability.api.metrics_pb2 import Metric
+from apache_beam.portability.api.metrics_pb2 import MonitoringInfo
 
 ELEMENT_COUNT_URN = common_urns.monitoring_infos.ELEMENT_COUNT.urn
 START_BUNDLE_MSECS_URN = common_urns.monitoring_infos.START_BUNDLE_MSECS.urn
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 1cee4a8..8c60b6b 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -591,6 +591,7 @@ class DebugOptions(PipelineOptions):
          'before enabling any experiments.'))
 
   def add_experiment(self, experiment):
+    # pylint: disable=access-member-before-definition
     if self.experiments is None:
       self.experiments = []
     if experiment not in self.experiments:
diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py
index a000f81..edbb681 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -21,8 +21,8 @@ from __future__ import absolute_import
 
 from builtins import object
 
-from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.portability.api import metrics_pb2
 from apache_beam.portability.api import standard_window_fns_pb2
 
 
@@ -78,6 +78,6 @@ session_windows = PropertiesFromPayloadType(
     standard_window_fns_pb2.SessionsPayload)
 
 monitoring_infos = PropertiesFromEnumType(
-    beam_fn_api_pb2.MonitoringInfoUrns.Enum)
+    metrics_pb2.MonitoringInfoUrns.Enum)
 monitoring_info_types = PropertiesFromEnumType(
-    beam_fn_api_pb2.MonitoringInfoTypeUrns.Enum)
+    metrics_pb2.MonitoringInfoTypeUrns.Enum)


Mime
View raw message