beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-3545) Fn API metrics in Go SDK harness
Date Wed, 28 Mar 2018 00:32:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3545?focusedWorklogId=85089&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85089
]

ASF GitHub Bot logged work on BEAM-3545:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Mar/18 00:31
            Start Date: 28/Mar/18 00:31
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #4899: [BEAM-3545] Go SDK UserCounters
URL: https://github.com/apache/beam/pull/4899
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/examples/wordcount/wordcount.go b/sdks/go/examples/wordcount/wordcount.go
index cd513a48cff..f28d1254d42 100644
--- a/sdks/go/examples/wordcount/wordcount.go
+++ b/sdks/go/examples/wordcount/wordcount.go
@@ -61,6 +61,7 @@ import (
 	"fmt"
 	"log"
 	"regexp"
+	"strings"
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
@@ -101,10 +102,18 @@ var (
 // returns a PCollection of type string. Also, using named function transforms allows
 // for easy reuse, modular testing, and an improved monitoring experience.
 
-var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+var (
+	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+	empty   = beam.NewCounter("extract", "emptyLines")
+	lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
 
 // extractFn is a DoFn that emits the words in a given line.
-func extractFn(line string, emit func(string)) {
+func extractFn(ctx context.Context, line string, emit func(string)) {
+	lineLen.Update(ctx, int64(len(line)))
+	if len(strings.TrimSpace(line)) == 0 {
+		empty.Inc(ctx, 1)
+	}
 	for _, word := range wordRE.FindAllString(line, -1) {
 		emit(word)
 	}
diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go b/sdks/go/pkg/beam/core/metrics/metrics.go
new file mode 100644
index 00000000000..73ef73a36b6
--- /dev/null
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package metrics implements the Beam metrics API, described at
+// http://s.apache.org/beam-metrics-api
+//
+// Metrics in the Beam model are uniquely identified by a namespace, a name,
+// and the PTransform context in which they are used. Further, they are
+// reported as a delta against the bundle being processed, so that overcounting
+// doesn't occur if a bundle needs to be retried. Each metric is scoped to
+// their bundle, and ptransform.
+//
+// Cells (or metric cells) are defined for each Beam model metric
+// type, and the serve as concurrency safe storage of a given metric's values.
+// Proxys are exported values representing the metric, for use in user
+// ptransform code. They don't retain their cells, since they don't have
+// the context to be able to store them for export back to the pipeline runner.
+//
+// Metric cells aren't initialized until their first mutation, which
+// follows from the Beam model design, where metrics are only sent for a bundle
+// if they have changed. This is particularly convenient for distributions which
+// means their min and max fields can be set to the first value on creation
+// rather than have some marker of uninitialized state, which would otherwise
+// need to be checked for on every update.
+//
+// Metric values are implemented as lightweight proxies of the user provided
+// namespace and name. This allows them to be declared globally, and used in
+// any ParDo. Further, as per the design, they can be declared dynamically
+// at runtime.
+//
+// To handle reporting deltas on the metrics by bundle, metrics
+// are keyed by bundleID,PTransformID,namespace, and name, so metrics that
+// are identical except for bundles are treated as distinct, effectively
+// providing per bundle deltas, since a new value cell is used per bundle.
+package metrics
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"sync"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+	"github.com/golang/protobuf/ptypes"
+)
+
+// Metric cells are named and scoped by ptransform, and bundle,
+// the latter two of which are only known at runtime. We propagate
+// the PTransformID and BundleID via a context.Context. Consequently
+// using metrics requires the PTransform have a context.Context
+// argument.
+
+type ctxKey string
+
+const bundleKey ctxKey = "beam:bundle"
+const ptransformKey ctxKey = "beam:ptransform"
+
+// SetBundleID sets the id of the current Bundle.
+func SetBundleID(ctx context.Context, id string) context.Context {
+	return context.WithValue(ctx, bundleKey, id)
+}
+
+// SetPTransformID sets the id of the current PTransform.
+func SetPTransformID(ctx context.Context, id string) context.Context {
+	return context.WithValue(ctx, ptransformKey, id)
+}
+
+func getContextKey(ctx context.Context, n name) key {
+	key := key{name: n, bundle: "(bundle id unset)", ptransform: "(ptransform id unset)"}
+	if id := ctx.Value(bundleKey); id != nil {
+		key.bundle = id.(string)
+	}
+	if id := ctx.Value(ptransformKey); id != nil {
+		key.ptransform = id.(string)
+	}
+	return key
+}
+
+// userMetric knows how to convert it's value to a Metrics_User proto.
+type userMetric interface {
+	toProto() *fnexecution_v1.Metrics_User
+}
+
+// name is a pair of strings identifying a specific metric.
+type name struct {
+	namespace, name string
+}
+
+func (n name) String() string {
+	return fmt.Sprintf("%s.%s", n.namespace, n.name)
+}
+
+func newName(ns, n string) name {
+	if len(n) == 0 || len(ns) == 0 {
+		panic(fmt.Sprintf("namespace and name are required to be non-empty, got %q and %q", ns,
n))
+	}
+	return name{namespace: ns, name: n}
+}
+
+type key struct {
+	name               name
+	bundle, ptransform string
+}
+
+var (
+	// mu protects access to store
+	mu sync.RWMutex
+	// store is a map of BundleIDs to PtransformIDs to userMetrics.
+	// it permits us to extract metric protos for runners per data Bundle, and
+	// per PTransform.
+	store = make(map[string]map[string]map[name]userMetric)
+
+	// We store the user path access to the cells in metric type segregated
+	// sync.Maps. Using sync.Maps lets metrics with disjoint keys have concurrent
+	// access to the cells, and using separate sync.Map per metric type
+	// simplifies code understanding, since each only contains a single type of
+	// cell.
+
+	// counters is a map[key]*counter
+	counters = sync.Map{}
+	// distributions is a map[key]*distribution
+	distributions = sync.Map{}
+	// gauges is a map[key]*gauge
+	gauges = sync.Map{}
+)
+
+// TODO(lostluck): 2018/03/05 Use a common internal beam now() instead, once that exists.
+var now = time.Now
+
+// Counter is a simple counter for incrementing and decrementing a value.
+type Counter struct {
+	name name
+}
+
+func (m Counter) String() string {
+	return fmt.Sprintf("Counter metric %s", m.name)
+}
+
+// NewCounter returns the Counter with the given namespace and name.
+func NewCounter(ns, n string) Counter {
+	mn := newName(ns, n)
+	return Counter{
+		name: mn,
+	}
+}
+
+// Inc increments the counter within the given PTransform context by v.
+func (m Counter) Inc(ctx context.Context, v int64) {
+	key := getContextKey(ctx, m.name)
+	cs := &counter{
+		value: v,
+	}
+	if m, loaded := counters.LoadOrStore(key, cs); loaded {
+		c := m.(*counter)
+		c.inc(v)
+	} else {
+		c := m.(*counter)
+		storeMetric(key, c)
+	}
+}
+
+// Dec decrements the counter within the given PTransform context by v.
+func (m Counter) Dec(ctx context.Context, v int64) {
+	m.Inc(ctx, -v)
+}
+
+// counter is a metric cell for counter values.
+type counter struct {
+	value int64
+	mu    sync.Mutex
+}
+
+func (m *counter) inc(v int64) {
+	m.mu.Lock()
+	m.value += v
+	m.mu.Unlock()
+}
+
+func (m *counter) String() string {
+	return fmt.Sprintf("value: %d", m.value)
+}
+
+// toProto returns a Metrics_User populated with the Data messages, but not the name. The
+// caller needs to populate with the metric's name.
+func (m *counter) toProto() *fnexecution_v1.Metrics_User {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	return &fnexecution_v1.Metrics_User{
+		Data: &fnexecution_v1.Metrics_User_CounterData_{
+			CounterData: &fnexecution_v1.Metrics_User_CounterData{
+				Value: m.value,
+			},
+		},
+	}
+}
+
+// Distribution is a simple distribution of values.
+type Distribution struct {
+	name name
+}
+
+func (m Distribution) String() string {
+	return fmt.Sprintf("Distribution metric %s", m.name)
+}
+
+// NewDistribution returns the Distribution with the given namespace and name.
+func NewDistribution(ns, n string) Distribution {
+	mn := newName(ns, n)
+	return Distribution{
+		name: mn,
+	}
+}
+
+// Update updates the distribution within the given PTransform context with v.
+func (m Distribution) Update(ctx context.Context, v int64) {
+	key := getContextKey(ctx, m.name)
+	ds := &distribution{
+		count: 1,
+		sum:   v,
+		min:   v,
+		max:   v,
+	}
+	if m, loaded := distributions.LoadOrStore(key, ds); loaded {
+		d := m.(*distribution)
+		d.update(v)
+	} else {
+		d := m.(*distribution)
+		storeMetric(key, d)
+	}
+}
+
+// distribution is a metric cell for distribution values.
+type distribution struct {
+	count, sum, min, max int64
+	mu                   sync.Mutex
+}
+
+func (m *distribution) update(v int64) {
+	m.mu.Lock()
+	if v < m.min {
+		m.min = v
+	}
+	if v > m.max {
+		m.max = v
+	}
+	m.count++
+	m.sum += v
+	m.mu.Unlock()
+}
+
+func (m *distribution) String() string {
+	return fmt.Sprintf("count: %d sum: %d min: %d max: %d", m.count, m.sum, m.min, m.max)
+}
+
+// toProto returns a Metrics_User populated with the Data messages, but not the name. The
+// caller needs to populate with the metric's name.
+func (m *distribution) toProto() *fnexecution_v1.Metrics_User {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	return &fnexecution_v1.Metrics_User{
+		Data: &fnexecution_v1.Metrics_User_DistributionData_{
+			DistributionData: &fnexecution_v1.Metrics_User_DistributionData{
+				Count: m.count,
+				Sum:   m.sum,
+				Min:   m.min,
+				Max:   m.max,
+			},
+		},
+	}
+}
+
+// Gauge is a time, value pair metric.
+type Gauge struct {
+	name name
+}
+
+func (m Gauge) String() string {
+	return fmt.Sprintf("Guage metric %s", m.name)
+}
+
+// NewGauge returns the Gauge with the given namespace and name.
+func NewGauge(ns, n string) Gauge {
+	mn := newName(ns, n)
+	return Gauge{
+		name: mn,
+	}
+}
+
+// Set sets the gauge to the given value, and associates it with the current time on the
clock.
+func (m Gauge) Set(ctx context.Context, v int64) {
+	key := getContextKey(ctx, m.name)
+	gs := &gauge{
+		t: now(),
+		v: v,
+	}
+	if m, loaded := gauges.LoadOrStore(key, gs); loaded {
+		g := m.(*gauge)
+		g.set(v)
+	} else {
+		g := m.(*gauge)
+		storeMetric(key, g)
+	}
+}
+
+// storeMetric stores a metric away on its first use so it may be retrieved later on.
+func storeMetric(key key, m userMetric) {
+	mu.Lock()
+	defer mu.Unlock()
+	if _, ok := store[key.bundle]; !ok {
+		store[key.bundle] = make(map[string]map[name]userMetric)
+	}
+	if _, ok := store[key.bundle][key.ptransform]; !ok {
+		store[key.bundle][key.ptransform] = make(map[name]userMetric)
+	}
+	if _, ok := store[key.bundle][key.ptransform][key.name]; ok {
+		panic(fmt.Sprintf("metric name %s being reused for a second metric in a single PTransform",
key.name))
+	}
+	store[key.bundle][key.ptransform][key.name] = m
+}
+
+// gauge is a metric cell for gauge values.
+type gauge struct {
+	mu sync.Mutex
+	t  time.Time
+	v  int64
+}
+
+func (m *gauge) set(v int64) {
+	m.mu.Lock()
+	m.t = now()
+	m.v = v
+	m.mu.Unlock()
+}
+
+func (m *gauge) toProto() *fnexecution_v1.Metrics_User {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	ts, err := ptypes.TimestampProto(m.t)
+	if err != nil {
+		panic(err)
+	}
+	return &fnexecution_v1.Metrics_User{
+		Data: &fnexecution_v1.Metrics_User_GaugeData_{
+			GaugeData: &fnexecution_v1.Metrics_User_GaugeData{
+				Value:     m.v,
+				Timestamp: ts,
+			},
+		},
+	}
+}
+
+func (m *gauge) String() string {
+	return fmt.Sprintf("time: %s value: %d", m.t, m.v)
+}
+
+// ToProto exports all collected metrics for the given BundleID and PTransform ID pair.
+func ToProto(b, pt string) []*fnexecution_v1.Metrics_User {
+	mu.RLock()
+	defer mu.RUnlock()
+	ps := store[b]
+	s := ps[pt]
+	var ret []*fnexecution_v1.Metrics_User
+	for n, m := range s {
+		p := m.toProto()
+		p.MetricName = &fnexecution_v1.Metrics_User_MetricName{
+			Name:      n.name,
+			Namespace: n.namespace,
+		}
+		ret = append(ret, p)
+	}
+	return ret
+}
+
+// DumpToLog is a debugging function that outputs all metrics available locally to beam.Log.
+func DumpToLog(ctx context.Context) {
+	dumpTo(func(format string, args ...interface{}) {
+		log.Errorf(ctx, format, args...)
+	})
+}
+
+// DumpToOut is a debugging function that outputs all metrics available locally to std out.
+func DumpToOut() {
+	dumpTo(func(format string, args ...interface{}) {
+		fmt.Printf(format+"\n", args...)
+	})
+}
+
+func dumpTo(p func(format string, args ...interface{})) {
+	mu.RLock()
+	defer mu.RUnlock()
+	var bs []string
+	for b := range store {
+		bs = append(bs, b)
+	}
+	sort.Strings(bs)
+	for _, b := range bs {
+		var pts []string
+		for pt := range store[b] {
+			pts = append(pts, pt)
+		}
+		sort.Strings(pts)
+		for _, pt := range pts {
+			var ns []name
+			for n := range store[b][pt] {
+				ns = append(ns, n)
+			}
+			sort.Slice(ns, func(i, j int) bool {
+				if ns[i].namespace < ns[j].namespace {
+					return true
+				}
+				if ns[i].namespace == ns[j].namespace && ns[i].name < ns[j].name {
+					return true
+				}
+				return false
+			})
+			p("Bundle: %q - PTransformID: %q", b, pt)
+			for _, n := range ns {
+				key := key{name: n, bundle: b, ptransform: pt}
+				if m, ok := counters.Load(key); ok {
+					p("\t%s - %s", key.name, m)
+				}
+				if m, ok := distributions.Load(key); ok {
+					p("\t%s - %s", key.name, m)
+				}
+				if m, ok := gauges.Load(key); ok {
+					p("\t%s - %s", key.name, m)
+				}
+			}
+		}
+	}
+}
+
+// Clear resets all storage associated with metrics for tests.
+// Calling this in pipeline code leads to inaccurate metrics.
+func Clear() {
+	mu.Lock()
+	store = make(map[string]map[string]map[name]userMetric)
+	counters = sync.Map{}
+	distributions = sync.Map{}
+	gauges = sync.Map{}
+	mu.Unlock()
+}
+
+// ClearBundleData removes stored references associated with a given bundle,
+// so it can be garbage collected.
+func ClearBundleData(b string) {
+	// No concurrency races since mu guards all access to store,
+	// and the metric cell sync.Maps are goroutine safe.
+	mu.Lock()
+	pts := store[b]
+	for pt, m := range pts {
+		for n := range m {
+			key := key{name: n, bundle: b, ptransform: pt}
+			counters.Delete(key)
+			distributions.Delete(key)
+			gauges.Delete(key)
+		}
+	}
+	delete(store, b)
+	mu.Unlock()
+}
diff --git a/sdks/go/pkg/beam/core/metrics/metrics_test.go b/sdks/go/pkg/beam/core/metrics/metrics_test.go
new file mode 100644
index 00000000000..80724df2580
--- /dev/null
+++ b/sdks/go/pkg/beam/core/metrics/metrics_test.go
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+	"context"
+	"fmt"
+	"testing"
+	"time"
+)
+
+// bID is a bundleId to use in the tests, if nothing more specific is needed.
+const bID = "bID"
+
+func ctxWith(b, pt string) context.Context {
+	ctx := context.Background()
+	ctx = SetPTransformID(ctx, pt)
+	ctx = SetBundleID(ctx, b)
+	return ctx
+}
+
+func TestCounter_Inc(t *testing.T) {
+	tests := []struct {
+		ns, n, key string // Counter name and PTransform context
+		inc        int64
+		value      int64 // Internal variable to check
+	}{
+		{ns: "inc1", n: "count", key: "A", inc: 1, value: 1},
+		{ns: "inc1", n: "count", key: "A", inc: 1, value: 2},
+		{ns: "inc1", n: "ticker", key: "A", inc: 1, value: 1},
+		{ns: "inc1", n: "ticker", key: "A", inc: 2, value: 3},
+		{ns: "inc1", n: "count", key: "B", inc: 1, value: 1},
+		{ns: "inc1", n: "count", key: "B", inc: 1, value: 2},
+		{ns: "inc1", n: "ticker", key: "B", inc: 1, value: 1},
+		{ns: "inc1", n: "ticker", key: "B", inc: 2, value: 3},
+		{ns: "inc2", n: "count", key: "A", inc: 1, value: 1},
+		{ns: "inc2", n: "count", key: "A", inc: 1, value: 2},
+		{ns: "inc2", n: "ticker", key: "A", inc: 1, value: 1},
+		{ns: "inc2", n: "ticker", key: "A", inc: 2, value: 3},
+		{ns: "inc2", n: "count", key: "B", inc: 1, value: 1},
+		{ns: "inc2", n: "count", key: "B", inc: 1, value: 2},
+		{ns: "inc2", n: "ticker", key: "B", inc: 1, value: 1},
+		{ns: "inc2", n: "ticker", key: "B", inc: 2, value: 3},
+	}
+
+	for _, test := range tests {
+		t.Run(fmt.Sprintf("add %d to %s.%s[%q] value: %d", test.inc, test.ns, test.n, test.key,
test.value),
+			func(t *testing.T) {
+				m := NewCounter(test.ns, test.n)
+				ctx := ctxWith(bID, test.key)
+				m.Inc(ctx, test.inc)
+
+				key := key{name: name{namespace: test.ns, name: test.n}, bundle: bID, ptransform: test.key}
+				q, ok := counters.Load(key)
+				if !ok {
+					t.Fatalf("Unable to find Counter for key %v", key)
+				}
+				c := q.(*counter)
+				if got, want := c.value, test.value; got != want {
+					t.Errorf("GetCounter(%q,%q).Inc(%s, %d) c.value got %v, want %v", test.ns, test.n, test.key,
test.inc, got, want)
+				}
+			})
+	}
+}
+
+func TestCounter_Dec(t *testing.T) {
+	tests := []struct {
+		ns, n, key string // Counter name and PTransform context
+		dec        int64
+		value      int64 // Internal variable to check
+	}{
+		{ns: "dec1", n: "count", key: "A", dec: 1, value: -1},
+		{ns: "dec1", n: "count", key: "A", dec: 1, value: -2},
+		{ns: "dec1", n: "ticker", key: "A", dec: 1, value: -1},
+		{ns: "dec1", n: "ticker", key: "A", dec: 2, value: -3},
+		{ns: "dec1", n: "count", key: "B", dec: 1, value: -1},
+		{ns: "dec1", n: "count", key: "B", dec: 1, value: -2},
+		{ns: "dec1", n: "ticker", key: "B", dec: 1, value: -1},
+		{ns: "dec1", n: "ticker", key: "B", dec: 2, value: -3},
+		{ns: "dec2", n: "count", key: "A", dec: 1, value: -1},
+		{ns: "dec2", n: "count", key: "A", dec: 1, value: -2},
+		{ns: "dec2", n: "ticker", key: "A", dec: 1, value: -1},
+		{ns: "dec2", n: "ticker", key: "A", dec: 2, value: -3},
+		{ns: "dec2", n: "count", key: "B", dec: 1, value: -1},
+		{ns: "dec2", n: "count", key: "B", dec: 1, value: -2},
+		{ns: "dec2", n: "ticker", key: "B", dec: 1, value: -1},
+		{ns: "dec2", n: "ticker", key: "B", dec: 2, value: -3},
+	}
+
+	for _, test := range tests {
+		t.Run(fmt.Sprintf("subtract %d to %s.%s[%q] value: %d", test.dec, test.ns, test.n, test.key,
test.value),
+			func(t *testing.T) {
+				m := NewCounter(test.ns, test.n)
+				ctx := ctxWith(bID, test.key)
+				m.Dec(ctx, test.dec)
+
+				key := key{name: name{namespace: test.ns, name: test.n}, bundle: bID, ptransform: test.key}
+				q, ok := counters.Load(key)
+				if !ok {
+					t.Fatalf("Unable to find Counter for key %v", key)
+				}
+				c := q.(*counter)
+				if got, want := c.value, test.value; got != want {
+					t.Errorf("GetCounter(%q,%q).Dec(%s, %d) c.value got %v, want %v", test.ns, test.n, test.key,
test.dec, got, want)
+				}
+			})
+	}
+}
+
+func TestDistribution_Update(t *testing.T) {
+	tests := []struct {
+		ns, n, key           string // Gauge name and PTransform context
+		v                    int64
+		count, sum, min, max int64 // Internal variables to check
+	}{
+		{ns: "update1", n: "latency", key: "A", v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update1", n: "latency", key: "A", v: 1, count: 2, sum: 2, min: 1, max: 1},
+		{ns: "update1", n: "latency", key: "A", v: 1, count: 3, sum: 3, min: 1, max: 1},
+		{ns: "update1", n: "size", key: "A", v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update1", n: "size", key: "A", v: 2, count: 2, sum: 3, min: 1, max: 2},
+		{ns: "update1", n: "size", key: "A", v: 3, count: 3, sum: 6, min: 1, max: 3},
+		{ns: "update1", n: "size", key: "A", v: -4, count: 4, sum: 2, min: -4, max: 3},
+		{ns: "update1", n: "size", key: "A", v: 1, count: 5, sum: 3, min: -4, max: 3},
+		{ns: "update1", n: "latency", key: "B", v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update1", n: "latency", key: "B", v: 1, count: 2, sum: 2, min: 1, max: 1},
+		{ns: "update1", n: "size", key: "B", v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update1", n: "size", key: "B", v: 2, count: 2, sum: 3, min: 1, max: 2},
+		{ns: "update2", n: "latency", key: "A", v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update2", n: "latency", key: "A", v: 1, count: 2, sum: 2, min: 1, max: 1},
+		{ns: "update2", n: "size", key: "A", v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update2", n: "size", key: "A", v: 2, count: 2, sum: 3, min: 1, max: 2},
+		{ns: "update2", n: "latency", key: "B", v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update2", n: "latency", key: "B", v: 1, count: 2, sum: 2, min: 1, max: 1},
+		{ns: "update2", n: "size", key: "B", v: 1, count: 1, sum: 1, min: 1, max: 1},
+		{ns: "update2", n: "size", key: "B", v: 2, count: 2, sum: 3, min: 1, max: 2},
+		{ns: "update1", n: "size", key: "A", v: 1, count: 6, sum: 4, min: -4, max: 3},
+	}
+
+	for _, test := range tests {
+		t.Run(fmt.Sprintf("add %d to %s.%s[%q] count: %d sum: %d", test.v, test.ns, test.n, test.key,
test.count, test.sum),
+			func(t *testing.T) {
+				m := NewDistribution(test.ns, test.n)
+				ctx := ctxWith(bID, test.key)
+				m.Update(ctx, test.v)
+
+				key := key{name: name{namespace: test.ns, name: test.n}, bundle: bID, ptransform: test.key}
+				q, ok := distributions.Load(key)
+				if !ok {
+					t.Fatalf("Unable to find Distribution for key %v", key)
+				}
+				d := q.(*distribution)
+				if got, want := d.count, test.count; got != want {
+					t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.count got %v, want %v", test.ns, test.n,
test.key, test.v, got, want)
+				}
+				if got, want := d.sum, test.sum; got != want {
+					t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.sum got %v, want %v", test.ns, test.n,
test.key, test.v, got, want)
+				}
+				if got, want := d.min, test.min; got != want {
+					t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.min got %v, want %v", test.ns, test.n,
test.key, test.v, got, want)
+				}
+				if got, want := d.max, test.max; got != want {
+					t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.max got %v, want %v", test.ns, test.n,
test.key, test.v, got, want)
+				}
+			})
+	}
+}
+
+func testclock(t time.Time) func() time.Time {
+	return func() time.Time { return t }
+}
+
+func TestGauge_Set(t *testing.T) {
+	tests := []struct {
+		ns, n, key string // Gauge name and PTransform context
+		v          int64
+		t          time.Time
+	}{
+		{ns: "set1", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "speed", key: "A", v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "speed", key: "A", v: 2, t: time.Unix(0, 0)},
+		{ns: "set1", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "speed", key: "B", v: 1, t: time.Unix(0, 0)},
+		{ns: "set1", n: "speed", key: "B", v: 2, t: time.Unix(0, 0)},
+		{ns: "set2", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "speed", key: "A", v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "speed", key: "A", v: 2, t: time.Unix(0, 0)},
+		{ns: "set2", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "speed", key: "B", v: 1, t: time.Unix(0, 0)},
+		{ns: "set2", n: "speed", key: "B", v: 2, t: time.Unix(0, 0)},
+	}
+
+	for _, test := range tests {
+		t.Run(fmt.Sprintf("set %s.%s[%q] to %d at %v", test.ns, test.n, test.key, test.v, test.t),
+			func(t *testing.T) {
+				m := NewGauge(test.ns, test.n)
+				ctx := ctxWith(bID, test.key)
+				now = testclock(test.t)
+				m.Set(ctx, test.v)
+
+				key := key{name: name{namespace: test.ns, name: test.n}, bundle: bID, ptransform: test.key}
+				q, ok := gauges.Load(key)
+				if !ok {
+					t.Fatalf("Unable to find Gauge for key %v", key)
+				}
+				g := q.(*gauge)
+				if got, want := g.v, test.v; got != want {
+					t.Errorf("GetGauge(%q,%q).Set(%s, %d) g.v got %v, want %v", test.ns, test.n, test.key,
test.v, got, want)
+				}
+				if got, want := g.t, test.t; got != want {
+					t.Errorf("GetGauge(%q,%q).Set(%s, %d) t.t got %v, want %v", test.ns, test.n, test.key,
test.v, got, want)
+				}
+			})
+	}
+}
+
+type metricType uint8
+
+const (
+	counterType metricType = iota
+	distributionType
+	gaugeType
+)
+
+func (t metricType) String() string {
+	switch t {
+	case counterType:
+		return "Counter"
+	case distributionType:
+		return "Distribution"
+	case gaugeType:
+		return "Gauge"
+	default:
+		panic(fmt.Sprintf("Unknown metric type value: %v", uint8(t)))
+	}
+}
+func TestNameCollisions(t *testing.T) {
+	ns, c, d, g := "collisions", "counter", "distribution", "gauge"
+	// Checks that user code panics if a counter attempts to be defined in the same PTransform
+	// Collisions are unfortunately only detectable at runtime, and only if both the initial
+	// metric, and the new metric are actually used, since we don't know the context until
+	// then.
+	// Pre-create and use so that we have existing metrics to collide with.
+	NewCounter(ns, c).Inc(ctxWith(bID, c), 1)
+	NewDistribution(ns, d).Update(ctxWith(bID, d), 1)
+	NewGauge(ns, g).Set(ctxWith(bID, g), 1)
+	tests := []struct {
+		existing, new metricType
+	}{
+		{existing: counterType, new: counterType},
+		{existing: counterType, new: distributionType},
+		{existing: counterType, new: gaugeType},
+		{existing: distributionType, new: counterType},
+		{existing: distributionType, new: distributionType},
+		{existing: distributionType, new: gaugeType},
+		{existing: gaugeType, new: counterType},
+		{existing: gaugeType, new: distributionType},
+		{existing: gaugeType, new: gaugeType},
+	}
+	for _, test := range tests {
+		t.Run(fmt.Sprintf("%s name collides with %s", test.existing, test.new),
+			func(t *testing.T) {
+				defer func() {
+					if test.existing != test.new {
+						if e := recover(); e != nil {
+							t.Logf("panic caught re-using a name between a %s, and a %s", test.existing, test.new)
+							return
+						}
+						t.Error("panic expected")
+					} else {
+						t.Log("reusing names is fine when the metrics the same type:", test.existing, test.new)
+					}
+				}()
+				var name string
+				switch test.existing {
+				case counterType:
+					name = c
+				case distributionType:
+					name = d
+				case gaugeType:
+					name = g
+				default:
+					t.Fatalf("unknown existing metricType with value: %v", int(test.existing))
+				}
+				switch test.new {
+				case counterType:
+					NewCounter(ns, name).Inc(ctxWith(bID, name), 1)
+				case distributionType:
+					NewDistribution(ns, name).Update(ctxWith(bID, name), 1)
+				case gaugeType:
+					NewGauge(ns, name).Set(ctxWith(bID, name), 1)
+				default:
+					t.Fatalf("unknown new metricType with value: %v", int(test.new))
+				}
+
+			})
+	}
+}
+
+func TestClearBundleData(t *testing.T) {
+	Clear()
+	dump := func(t *testing.T) {
+		dumpTo(func(format string, args ...interface{}) {
+			t.Logf(format, args...)
+		})
+	}
+	pt, c, d, g := "clear.bundle.data", "counter", "distribution", "gauge"
+	aBundleID := "aBID"
+	otherBundleID := "otherBID"
+	NewCounter(pt, c).Inc(ctxWith(aBundleID, pt), 1)
+	NewDistribution(pt, d).Update(ctxWith(aBundleID, pt), 1)
+	NewGauge(pt, g).Set(ctxWith(aBundleID, pt), 1)
+
+	NewCounter(pt, c).Inc(ctxWith(otherBundleID, pt), 1)
+	NewDistribution(pt, d).Update(ctxWith(otherBundleID, pt), 1)
+	NewGauge(pt, g).Set(ctxWith(otherBundleID, pt), 1)
+
+	initialAP := ToProto(aBundleID, pt)
+	if got, want := len(initialAP), 3; got != want {
+		dump(t)
+		t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - initialAP: %v", aBundleID, pt, got, want,
initialAP)
+	}
+	initialOP := ToProto(otherBundleID, pt)
+	if got, want := len(initialOP), 3; got != want {
+		dump(t)
+		t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - initialOP: %v", otherBundleID, pt, got,
want, initialOP)
+	}
+
+	ClearBundleData(aBundleID)
+
+	newAP := ToProto(aBundleID, pt)
+	if got, want := len(newAP), 0; got != want {
+		dump(t)
+		t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - newAP: %v", aBundleID, pt, got, want, newAP)
+	}
+
+	newOP := ToProto(otherBundleID, pt)
+	if got, want := len(newOP), 3; got != want {
+		dump(t)
+		t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - newOP: %v", otherBundleID, pt, got, want,
newOP)
+	}
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index f0d0a561357..a2f7d7ba4b2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -23,6 +23,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
 )
@@ -35,6 +36,7 @@ type ParDo struct {
 	Side    []ReStream
 	Out     []Node
 
+	PID       string
 	ready     bool
 	sideinput []ReusableInput
 	emitters  []ReusableEmitter
@@ -92,6 +94,8 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm FullValue, values
...ReS
 		return fmt.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status)
 	}
 
+	ctx = metrics.SetPTransformID(ctx, n.PID)
+
 	val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(), &MainInput{Key:
elm, Values: values})
 	if err != nil {
 		return n.fail(err)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index 54dc812c5fe..f9168212aa1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -21,15 +21,19 @@ import (
 	"context"
 	"fmt"
 	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 )
 
 // Plan represents the bundle execution plan. It will generally be constructed
 // from a part of a pipeline. A plan can be used to process multiple bundles
 // serially.
 type Plan struct {
-	id    string
-	roots []Root
-	units []Unit
+	id       string
+	roots    []Root
+	units    []Unit
+	parDoIds []string
 
 	status Status
 
@@ -41,6 +45,7 @@ type Plan struct {
 func NewPlan(id string, units []Unit) (*Plan, error) {
 	var roots []Root
 	var source *DataSource
+	var pardoIDs []string
 
 	for _, u := range units {
 		if u == nil {
@@ -52,17 +57,21 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
 		if s, ok := u.(*DataSource); ok {
 			source = s
 		}
+		if p, ok := u.(*ParDo); ok {
+			pardoIDs = append(pardoIDs, p.PID)
+		}
 	}
 	if len(roots) == 0 {
 		return nil, fmt.Errorf("no root units")
 	}
 
 	return &Plan{
-		id:     id,
-		status: Initializing,
-		roots:  roots,
-		units:  units,
-		source: source,
+		id:       id,
+		status:   Initializing,
+		roots:    roots,
+		units:    units,
+		parDoIds: pardoIDs,
+		source:   source,
 	}, nil
 }
 
@@ -75,6 +84,7 @@ func (p *Plan) ID() string {
 // are brought up on the first execution. If a bundle fails, the plan cannot
 // be reused for further bundles. Does not panic. Blocking.
 func (p *Plan) Execute(ctx context.Context, id string, manager DataManager) error {
+	ctx = metrics.SetBundleID(ctx, p.id)
 	if p.status == Initializing {
 		for _, u := range p.units {
 			if err := callNoPanic(ctx, u.Up); err != nil {
@@ -147,7 +157,28 @@ func (p *Plan) String() string {
 	return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
 }
 
-// ProgressReport returns a snapshot of input progress of the plan.
-func (p *Plan) ProgressReport() ProgressReportSnapshot {
-	return p.source.Progress()
+// Metrics returns a snapshot of input progress of the plan, and associated metrics.
+func (p *Plan) Metrics() *fnpb.Metrics {
+	snapshot := p.source.Progress()
+
+	transforms := map[string]*fnpb.Metrics_PTransform{
+		snapshot.ID: &fnpb.Metrics_PTransform{
+			ProcessedElements: &fnpb.Metrics_PTransform_ProcessedElements{
+				Measured: &fnpb.Metrics_PTransform_Measured{
+					OutputElementCounts: map[string]int64{
+						snapshot.Name: snapshot.Count,
+					},
+				},
+			},
+		},
+	}
+
+	for _, pt := range p.parDoIds {
+		transforms[pt] = &fnpb.Metrics_PTransform{
+			User: metrics.ToProto(p.id, pt),
+		}
+	}
+	return &fnpb.Metrics{
+		Ptransforms: transforms,
+	}
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 291ef3a0d1d..946f216fd5b 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -17,6 +17,7 @@ package exec
 
 import (
 	"fmt"
+	"path"
 	"strconv"
 	"strings"
 
@@ -295,11 +296,13 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 
 			switch op {
 			case graph.ParDo:
-				n := &ParDo{UID: b.idgen.New(), Inbound: in, Out: out}
+				n := &ParDo{UID: b.idgen.New(), PID: id.to, Inbound: in, Out: out}
 				n.Fn, err = graph.AsDoFn(fn)
 				if err != nil {
 					return nil, err
 				}
+				// TODO(lostluck): 2018/03/22 Look into why transform.UniqueName isn't populated at this
point, and switch n.PID to that instead.
+				n.PID = path.Base(n.Fn.Name())
 				if len(in) == 1 {
 					u = n
 					break
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 57a8854591d..5e545c550f9 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -154,7 +154,7 @@ type control struct {
 
 func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse
{
 	id := req.GetInstructionId()
-	ctx = context.WithValue(ctx, instKey, id)
+	ctx = setInstID(ctx, id)
 
 	switch {
 	case req.GetRegister() != nil:
@@ -202,7 +202,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		}
 
 		err := plan.Execute(ctx, id, c.data)
-
+		m := plan.Metrics()
 		// Move the plan back to the candidate state
 		c.mu.Lock()
 		c.plans[plan.ID()] = plan
@@ -216,7 +216,9 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		return &fnpb.InstructionResponse{
 			InstructionId: id,
 			Response: &fnpb.InstructionResponse_ProcessBundle{
-				ProcessBundle: &fnpb.ProcessBundleResponse{},
+				ProcessBundle: &fnpb.ProcessBundleResponse{
+					Metrics: m,
+				},
 			},
 		}
 
@@ -233,25 +235,13 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 			return fail(id, "execution plan for %v not found", ref)
 		}
 
-		snapshot := plan.ProgressReport()
+		m := plan.Metrics()
 
 		return &fnpb.InstructionResponse{
 			InstructionId: id,
 			Response: &fnpb.InstructionResponse_ProcessBundleProgress{
 				ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{
-					Metrics: &fnpb.Metrics{
-						Ptransforms: map[string]*fnpb.Metrics_PTransform{
-							snapshot.ID: &fnpb.Metrics_PTransform{
-								ProcessedElements: &fnpb.Metrics_PTransform_ProcessedElements{
-									Measured: &fnpb.Metrics_PTransform_Measured{
-										OutputElementCounts: map[string]int64{
-											snapshot.Name: snapshot.Count,
-										},
-									},
-								},
-							},
-						},
-					},
+					Metrics: m,
 				},
 			},
 		}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/logging.go b/sdks/go/pkg/beam/core/runtime/harness/logging.go
index 68e225e246d..f7608cdffde 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/logging.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/logging.go
@@ -32,8 +32,9 @@ import (
 
 // TODO(herohde) 10/13/2017: add top-level harness.Main panic handler that flushes logs.
 // Also make logger flush on Fatal severity messages.
+type contextKey string
 
-const instKey = "beam:inst"
+const instKey contextKey = "beam:inst"
 
 func setInstID(ctx context.Context, id string) context.Context {
 	return context.WithValue(ctx, instKey, id)
diff --git a/sdks/go/pkg/beam/metrics.go b/sdks/go/pkg/beam/metrics.go
new file mode 100644
index 00000000000..5e5bef6ecba
--- /dev/null
+++ b/sdks/go/pkg/beam/metrics.go
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package beam
+
+import (
+	"context"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+)
+
+// Counter is a metric that can be incremented and decremented,
+// and is aggregated by the sum.
+type Counter struct {
+	metrics.Counter
+}
+
+// Inc increments the counter within by the given amount.
+func (c Counter) Inc(ctx context.Context, v int64) {
+	c.Counter.Inc(ctx, v)
+}
+
+// Dec decrements the counter within by the given amount.
+func (c Counter) Dec(ctx context.Context, v int64) {
+	c.Counter.Dec(ctx, v)
+}
+
+// NewCounter returns the Counter with the given namespace and name.
+func NewCounter(namespace, name string) Counter {
+	return Counter{metrics.NewCounter(namespace, name)}
+}
+
+// Distribution is a metric that records various statistics about the distribution
+// of reported values.
+type Distribution struct {
+	metrics.Distribution
+}
+
+// Update adds an observation to this distribution.
+func (c Distribution) Update(ctx context.Context, v int64) {
+	c.Distribution.Update(ctx, v)
+}
+
+// NewDistribution returns the Distribution with the given namespace and name.
+func NewDistribution(namespace, name string) Distribution {
+	return Distribution{metrics.NewDistribution(namespace, name)}
+}
+
+// Gauge is a metric that can have its new value set, and is aggregated by taking
+// the last reported value.
+type Gauge struct {
+	metrics.Gauge
+}
+
+// Set sets the current value for this gauge.
+func (c Gauge) Set(ctx context.Context, v int64) {
+	c.Gauge.Set(ctx, v)
+}
+
+// NewGauge returns the Gauge with the given namespace and name.
+func NewGauge(namespace, name string) Gauge {
+	return Gauge{metrics.NewGauge(namespace, name)}
+}
diff --git a/sdks/go/pkg/beam/metrics_test.go b/sdks/go/pkg/beam/metrics_test.go
new file mode 100644
index 00000000000..11b44101671
--- /dev/null
+++ b/sdks/go/pkg/beam/metrics_test.go
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package beam_test
+
+import (
+	"context"
+	"regexp"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+)
+
+// A beam_test global context var to improve how the examples look.
+var ctx = context.Background()
+
+func ctxWithPtransformID(id string) context.Context {
+	ctx := context.Background()
+	ctx = metrics.SetPTransformID(ctx, id)
+	ctx = metrics.SetBundleID(ctx, "exampleBundle")
+	return ctx
+}
+
+func dumpAndClearMetrics() {
+	metrics.DumpToOut()
+	metrics.Clear()
+}
+
+var (
+	wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+)
+
+func Example_metricsDeclaredAnywhere() {
+
+	// Metrics can be declared outside DoFns, and used inside..
+	outside := beam.NewCounter("example.namespace", "count")
+
+	extractWordsDofn := func(ctx context.Context, line string, emit func(string)) {
+		// They can be defined at time of use within a DoFn, if necessary.
+		inside := beam.NewDistribution("example.namespace", "characters")
+		for _, word := range wordRE.FindAllString(line, -1) {
+			emit(word)
+			outside.Inc(ctx, 1)
+			inside.Update(ctx, int64(len(word)))
+		}
+	}
+	ctx := ctxWithPtransformID("example")
+	extractWordsDofn(ctx, "this has six words in it", func(string) {})
+	extractWordsDofn(ctx, "this has seven words in it, see?", func(string) {})
+
+	dumpAndClearMetrics()
+	// Output: Bundle: "exampleBundle" - PTransformID: "example"
+	//	example.namespace.characters - count: 13 sum: 43 min: 2 max: 5
+	//	example.namespace.count - value: 13
+}
+
+func Example_metricsReusable() {
+
+	// Metrics can be used in multiple DoFns
+	c := beam.NewCounter("example.reusable", "count")
+
+	extractWordsDofn := func(ctx context.Context, line string, emit func(string)) {
+		for _, word := range wordRE.FindAllString(line, -1) {
+			emit(word)
+			c.Inc(ctx, 1)
+		}
+	}
+
+	extractRunesDofn := func(ctx context.Context, line string, emit func(rune)) {
+		for _, r := range line {
+			emit(r)
+			c.Inc(ctx, 1)
+		}
+	}
+	extractWordsDofn(ctxWithPtransformID("extract1"), "this has six words in it", func(string)
{})
+
+	extractRunesDofn(ctxWithPtransformID("extract2"), "seven thousand", func(rune) {})
+
+	dumpAndClearMetrics()
+	// Output: Bundle: "exampleBundle" - PTransformID: "extract1"
+	//	example.reusable.count - value: 6
+	// Bundle: "exampleBundle" - PTransformID: "extract2"
+	//	example.reusable.count - value: 14
+}
+
+func ExampleCounter_Inc() {
+	c := beam.NewCounter("example", "size")
+	c.Inc(ctx, int64(len("foobar")))
+}
+
+func ExampleCounter_Dec() {
+	c := beam.NewCounter("example", "size")
+	c.Dec(ctx, int64(len("foobar")))
+}
+
+func ExampleDistribution_Update() {
+	t := time.Millisecond * 42
+	d := beam.NewDistribution("example", "latency_micros")
+	d.Update(ctx, int64(t/time.Microsecond))
+}
+
+func ExampleGauge_Set() {
+	g := beam.NewGauge("example", "progress")
+	g.Set(ctx, 42)
+}
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go
index 78945daa0cc..1eb7c654669 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -18,9 +18,11 @@ package direct
 import (
 	"context"
 	"fmt"
+	"path"
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
@@ -48,7 +50,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 		plan.Down(ctx) // ignore any teardown errors
 		return err
 	}
-	return plan.Down(ctx)
+	if err = plan.Down(ctx); err != nil {
+		return err
+	}
+	metrics.DumpToLog(ctx)
+	return nil
 }
 
 // Compile translates a pipeline to a multi-bundle execution plan.
@@ -206,6 +212,7 @@ func (b *builder) makeLink(id linkID) (exec.Node, error) {
 	switch edge.Op {
 	case graph.ParDo:
 		pardo := &exec.ParDo{UID: b.idgen.New(), Fn: edge.DoFn, Inbound: edge.Input, Out: out}
+		pardo.PID = path.Base(pardo.Fn.Name())
 		if len(edge.Input) == 1 {
 			u = pardo
 			break


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 85089)
    Time Spent: 9.5h  (was: 9h 20m)

> Fn API metrics in Go SDK harness
> --------------------------------
>
>                 Key: BEAM-3545
>                 URL: https://issues.apache.org/jira/browse/BEAM-3545
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Kenneth Knowles
>            Assignee: Robert Burke
>            Priority: Major
>              Labels: portability
>          Time Spent: 9.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Mime
View raw message