beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-3042) Add tracking of bytes read / time spent when reading side inputs
Date Wed, 06 Dec 2017 22:36:01 GMT

    [ https://issues.apache.org/jira/browse/BEAM-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16281044#comment-16281044
] 

ASF GitHub Bot commented on BEAM-3042:
--------------------------------------

chamikaramj closed pull request #4222: [BEAM-3042] Updating Dataflow Api protos
URL: https://github.com/apache/beam/pull/4222
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
index b0d4e44816c..6c55d4e830f 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
@@ -16,7 +16,6 @@
 #
 
 """Generated message classes for dataflow version v1b3.
-
 Develops and executes data processing patterns like ETL, batch computation,
 and continuous computation.
 """
@@ -347,11 +346,19 @@ class CounterStructuredName(_messages.Message):
       workers.
     executionStepName: Name of the stage. An execution step contains multiple
       component steps.
+    inputIndex: Index of an input collection that's being read from/written to
+      as a side input. The index identifies a step's side inputs starting by 1
+      (e.g. the first side input has input_index 1, the third has input_index
+      3). Side inputs are identified by a pair of (original_step_name,
+      input_index). This field helps uniquely identify them.
     name: Counter name. Not necessarily globally-unique, but unique within the
       context of the other fields. Required.
     origin: One of the standard Origins defined above.
     originNamespace: A string containing a more specific namespace of the
       counter's origin.
+    originalRequestingStepName: The step name requesting an operation, such as
+      GBK. I.e. the ParDo causing a read/write from shuffle to occur, or a
+      read from side inputs.
     originalStepName: System generated name of the original step in the user's
       graph, before optimization.
     portion: Portion of this counter, either key or value.
@@ -382,12 +389,14 @@ class PortionValueValuesEnum(_messages.Enum):
 
   componentStepName = _messages.StringField(1)
   executionStepName = _messages.StringField(2)
-  name = _messages.StringField(3)
-  origin = _messages.EnumField('OriginValueValuesEnum', 4)
-  originNamespace = _messages.StringField(5)
-  originalStepName = _messages.StringField(6)
-  portion = _messages.EnumField('PortionValueValuesEnum', 7)
-  workerId = _messages.StringField(8)
+  inputIndex = _messages.IntegerField(3, variant=_messages.Variant.INT32)
+  name = _messages.StringField(4)
+  origin = _messages.EnumField('OriginValueValuesEnum', 5)
+  originNamespace = _messages.StringField(6)
+  originalRequestingStepName = _messages.StringField(7)
+  originalStepName = _messages.StringField(8)
+  portion = _messages.EnumField('PortionValueValuesEnum', 9)
+  workerId = _messages.StringField(10)
 
 
 class CounterStructuredNameAndMetadata(_messages.Message):
@@ -1401,8 +1410,7 @@ class DistributionUpdate(_messages.Message):
 
   Fields:
     count: The count of the number of elements present in the distribution.
-    logBuckets: (Optional) Logarithmic histogram of values. Each log may be in
-      no more than one bucket. Order does not matter.
+    histogram: (Optional) Histogram of value counts for the distribution.
     max: The maximum value present in the distribution.
     min: The minimum value present in the distribution.
     sum: Use an int64 since we'd prefer the added precision. If overflow is a
@@ -1412,7 +1420,7 @@ class DistributionUpdate(_messages.Message):
   """
 
   count = _messages.MessageField('SplitInt64', 1)
-  logBuckets = _messages.MessageField('LogBucket', 2, repeated=True)
+  histogram = _messages.MessageField('Histogram', 2)
   max = _messages.MessageField('SplitInt64', 3)
   min = _messages.MessageField('SplitInt64', 4)
   sum = _messages.MessageField('SplitInt64', 5)
@@ -1808,6 +1816,27 @@ class GetTemplateResponse(_messages.Message):
   status = _messages.MessageField('Status', 2)
 
 
+class Histogram(_messages.Message):
+  """Histogram of value counts for a distribution.  Buckets have an inclusive
+  lower bound and exclusive upper bound and use "1,2,5 bucketing": The first
+  bucket range is from [0,1) and all subsequent bucket boundaries are powers
+  of ten multiplied by 1, 2, or 5. Thus, bucket boundaries are 0, 1, 2, 5, 10,
+  20, 50, 100, 200, 500, 1000, ... Negative values are not supported.
+
+  Fields:
+    bucketCounts: Counts of values in each bucket. For efficiency, prefix and
+      trailing buckets with count = 0 are elided. Buckets can store the full
+      range of values of an unsigned long, with ULLONG_MAX falling into the
+      59th bucket with range [1e19, 2e19).
+    firstBucketOffset: Starting index of first stored bucket. The non-
+      inclusive upper-bound of the ith bucket is given by:
+      pow(10,(i-first_bucket_offset)/3) * (1,2,5)[(i-first_bucket_offset)%3]
+  """
+
+  bucketCounts = _messages.IntegerField(1, repeated=True)
+  firstBucketOffset = _messages.IntegerField(2, variant=_messages.Variant.INT32)
+
+
 class InstructionInput(_messages.Message):
   """An input of an instruction, as a reference to an output of a producer
   instruction.
@@ -2493,20 +2522,6 @@ class ListJobsResponse(_messages.Message):
   nextPageToken = _messages.StringField(3)
 
 
-class LogBucket(_messages.Message):
-  """Bucket of values for Distribution's logarithmic histogram.
-
-  Fields:
-    count: Number of values in this bucket.
-    log: floor(log2(value)); defined to be zero for nonpositive values.
-      log(-1) = 0   log(0) = 0   log(1) = 0   log(2) = 1   log(3) = 1   log(4)
-      = 2   log(5) = 2
-  """
-
-  count = _messages.IntegerField(1)
-  log = _messages.IntegerField(2, variant=_messages.Variant.INT32)
-
-
 class MapTask(_messages.Message):
   """MapTask consists of an ordered set of instructions, each of which
   describes one particular low-level operation for the worker to perform in
@@ -3068,6 +3083,7 @@ class ResourceUtilizationReportResponse(_messages.Message):
   """
 
 
+
 class RuntimeEnvironment(_messages.Message):
   """The environment values to set at runtime.
 
@@ -3501,11 +3517,22 @@ class SourceOperationRequest(_messages.Message):
 
   Fields:
     getMetadata: Information about a request to get metadata about a source.
+    name: User-provided name of the Read instruction for this source.
+    originalName: System-defined name for the Read instruction for this source
+      in the original workflow graph.
     split: Information about a request to split a source.
+    stageName: System-defined name of the stage containing the source
+      operation. Unique across the workflow.
+    systemName: System-defined name of the Read instruction for this source.
+      Unique across the workflow.
   """
 
   getMetadata = _messages.MessageField('SourceGetMetadataRequest', 1)
-  split = _messages.MessageField('SourceSplitRequest', 2)
+  name = _messages.StringField(2)
+  originalName = _messages.StringField(3)
+  split = _messages.MessageField('SourceSplitRequest', 4)
+  stageName = _messages.StringField(5)
+  systemName = _messages.StringField(6)
 
 
 class SourceOperationResponse(_messages.Message):
@@ -4426,6 +4453,8 @@ class WorkItemStatus(_messages.Message):
       progress and proposed_stop_position should be interpreted relative to P,
       and in a potential subsequent dynamic_source_split into {P', R'}, P' and
       R' must be together equivalent to P, etc.
+    totalThrottlerWaitTimeSeconds: Total time the worker spent being throttled
+      by external systems.
     workItemId: Identifies the WorkItem.
   """
 
@@ -4441,7 +4470,8 @@ class WorkItemStatus(_messages.Message):
   sourceFork = _messages.MessageField('SourceFork', 10)
   sourceOperationResponse = _messages.MessageField('SourceOperationResponse', 11)
   stopPosition = _messages.MessageField('Position', 12)
-  workItemId = _messages.StringField(13)
+  totalThrottlerWaitTimeSeconds = _messages.FloatField(13)
+  workItemId = _messages.StringField(14)
 
 
 class WorkerHealthReport(_messages.Message):
@@ -4532,6 +4562,7 @@ class WorkerMessage(_messages.Message):
     workerHealthReport: The health of a worker.
     workerMessageCode: A worker message code.
     workerMetrics: Resource metrics reported by workers.
+    workerShutdownNotice: Shutdown notice by workers.
   """
 
   @encoding.MapUnrecognizedFields('additionalProperties')
@@ -4568,6 +4599,7 @@ class AdditionalProperty(_messages.Message):
   workerHealthReport = _messages.MessageField('WorkerHealthReport', 3)
   workerMessageCode = _messages.MessageField('WorkerMessageCode', 4)
   workerMetrics = _messages.MessageField('ResourceUtilizationReport', 5)
+  workerShutdownNotice = _messages.MessageField('WorkerShutdownNotice', 6)
 
 
 class WorkerMessageCode(_messages.Message):
@@ -4664,10 +4696,13 @@ class WorkerMessageResponse(_messages.Message):
       report.
     workerMetricsResponse: Service's response to reporting worker metrics
       (currently empty).
+    workerShutdownNoticeResponse: Service's response to shutdown notice
+      (currently empty).
   """
 
   workerHealthReportResponse = _messages.MessageField('WorkerHealthReportResponse', 1)
   workerMetricsResponse = _messages.MessageField('ResourceUtilizationReportResponse', 2)
+  workerShutdownNoticeResponse = _messages.MessageField('WorkerShutdownNoticeResponse', 3)
 
 
 class WorkerPool(_messages.Message):
@@ -4913,6 +4948,24 @@ class WorkerSettings(_messages.Message):
   workerId = _messages.StringField(6)
 
 
+class WorkerShutdownNotice(_messages.Message):
+  """Shutdown notification from workers. This is to be sent by the shutdown
+  script of the worker VM so that the backend knows that the VM is being shut
+  down.
+
+  Fields:
+    reason: The reason for the worker shutdown. Current possible values are:
+      "UNKNOWN": shutdown reason is unknown.   "PREEMPTION": shutdown reason
+      is preemption. Other possible reasons may be added in the future.
+  """
+
+  reason = _messages.StringField(1)
+
+
+class WorkerShutdownNoticeResponse(_messages.Message):
+  """Service-side response to WorkerMessage issuing shutdown notice."""
+
+
 class WriteInstruction(_messages.Message):
   """An instruction that writes records. Takes one input, produces no outputs.
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add tracking of bytes read / time spent when reading side inputs
> ----------------------------------------------------------------
>
>                 Key: BEAM-3042
>                 URL: https://issues.apache.org/jira/browse/BEAM-3042
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>
> It is difficult for Dataflow users to understand how modifying a pipeline or data set
can affect how much inter-transform IO is used in their job. The intent of this feature request
is to help users understand how side inputs behave when they are consumed.
> This will allow users to understand how much time and how much data their pipeline uses
to read/write to inter-transform IO. Users will also be able to modify their pipelines and
understand how their changes affect these IO metrics.
> For further information, please review the internal Google doc go/insights-transform-io-design-doc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message