htrace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject incubator-htrace git commit: HTRACE-280. htraced: add metrics about total spans added and dropped per address (cmccabe)
Date Fri, 23 Oct 2015 02:42:35 GMT
Repository: incubator-htrace
Updated Branches:
  refs/heads/master 97faf708b -> c28fc60dc


HTRACE-280. htraced: add metrics about total spans added and dropped per address (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/c28fc60d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/c28fc60d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/c28fc60d

Branch: refs/heads/master
Commit: c28fc60dcfd83f61afcb7c2685ada1f2fafe7bf0
Parents: 97faf70
Author: Colin P. Mccabe <cmccabe@apache.org>
Authored: Thu Oct 22 19:38:38 2015 -0700
Committer: Colin P. Mccabe <cmccabe@apache.org>
Committed: Thu Oct 22 19:39:09 2015 -0700

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/common/process.go  |   2 +-
 .../org/apache/htrace/common/process_test.go    |   8 +-
 .../go/src/org/apache/htrace/common/rpc.go      |  58 +++--
 .../src/org/apache/htrace/conf/config_keys.go   |  18 ++
 .../go/src/org/apache/htrace/htrace/cmd.go      |  29 ++-
 .../org/apache/htrace/htraced/client_test.go    |   4 +-
 .../src/org/apache/htrace/htraced/datastore.go  | 121 +++++++---
 .../org/apache/htrace/htraced/datastore_test.go |  97 +++++---
 .../org/apache/htrace/htraced/heartbeater.go    | 117 ++++++++++
 .../apache/htrace/htraced/heartbeater_test.go   | 100 ++++++++
 .../go/src/org/apache/htrace/htraced/hrpc.go    |  18 +-
 .../go/src/org/apache/htrace/htraced/metrics.go | 234 +++++++++++++++++++
 .../org/apache/htrace/htraced/metrics_test.go   | 187 +++++++++++++++
 .../org/apache/htrace/htraced/mini_htraced.go   |  10 +-
 .../go/src/org/apache/htrace/htraced/rest.go    |   8 +-
 15 files changed, 906 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/common/process.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/process.go b/htrace-htraced/go/src/org/apache/htrace/common/process.go
index 419d6fe..aad6ca1 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/process.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/process.go
@@ -69,7 +69,7 @@ func InstallSignalHandlers(cnf *conf.Config) {
 	sigQuitChan := make(chan os.Signal, 1)
 	signal.Notify(sigQuitChan, syscall.SIGQUIT)
 	go func() {
-		bufSize := 1<<20
+		bufSize := 1 << 20
 		buf := make([]byte, bufSize)
 		for {
 			<-sigQuitChan

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/process_test.go b/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
index 7609133..d3f5a56 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/process_test.go
@@ -42,7 +42,7 @@ func TestSignals(t *testing.T) {
 		os.Exit(0)
 	}
 	helper := exec.Command(os.Args[0], "-test.run=TestSignals", "--")
-	helper.Env = []string { HTRACED_TEST_HELPER_PROCESS + "=1" }
+	helper.Env = []string{HTRACED_TEST_HELPER_PROCESS + "=1"}
 	stdoutPipe, err := helper.StdoutPipe()
 	if err != nil {
 		panic(fmt.Sprintf("Failed to open pipe to process stdout: %s",
@@ -77,7 +77,7 @@ func TestSignals(t *testing.T) {
 		}
 		t.Logf("Saw 'Terminating on signal: SIGINT'.  " +
 			"Helper goroutine exiting.\n")
-		done<-nil
+		done <- nil
 	}()
 	scanner := bufio.NewScanner(stderrPipe)
 	for scanner.Scan() {
@@ -97,9 +97,9 @@ func TestSignals(t *testing.T) {
 
 // Run the helper process which TestSignals spawns.
 func runHelperProcess() {
-	cnfMap := map[string]string {
+	cnfMap := map[string]string{
 		conf.HTRACE_LOG_LEVEL: "TRACE",
-		conf.HTRACE_LOG_PATH: "", // log to stdout
+		conf.HTRACE_LOG_PATH:  "", // log to stdout
 	}
 	cnfBld := conf.Builder{Values: cnfMap, Defaults: conf.DEFAULTS}
 	cnf, err := cnfBld.Build()

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 9c7bfad..34ed15e 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -38,8 +38,10 @@ const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
 
 // A request to write spans to htraced.
 type WriteSpansReq struct {
-	DefaultTrid string `json:",omitempty"`
-	Spans       []*Span
+	Addr          string // This gets filled in by the RPC layer.
+	DefaultTrid   string `json:",omitempty"`
+	Spans         []*Span
+	ClientDropped uint64 `json:",omitempty"`
 }
 
 // Info returned by /server/info
@@ -55,22 +57,6 @@ type ServerInfo struct {
 type WriteSpansResp struct {
 }
 
-// Info returned by /server/stats
-type ServerStats struct {
-	Shards []ShardStats
-}
-
-type ShardStats struct {
-	Path string
-
-	// The approximate number of spans present in this shard.  This may be an
-	// underestimate.
-	ApproxNumSpans uint64
-
-	// leveldb.stats information
-	LevelDbStats string
-}
-
 // The header which is sent over the wire for HRPC
 type HrpcRequestHeader struct {
 	Magic    uint32
@@ -104,3 +90,39 @@ func HrpcMethodNameToId(name string) uint32 {
 		return METHOD_ID_NONE
 	}
 }
+
+type SpanMetrics struct {
+	// The total number of spans written to HTraced.
+	Written uint64
+
+	// The total number of spans dropped by the server.
+	ServerDropped uint64
+
+	// The total number of spans dropped by the client.  Note that this number
+	// is tracked on the client itself and doesn't get updated if the client
+	// can't contact the server.
+	ClientDropped uint64
+}
+
+// A map from network address strings to SpanMetrics structures.
+type SpanMetricsMap map[string]*SpanMetrics
+
+// Info returned by /server/stats
+type ServerStats struct {
+	// Statistics for each shard (directory)
+	Dirs []StorageDirectoryStats
+
+	// Per-host Span Metrics
+	HostSpanMetrics SpanMetricsMap
+}
+
+type StorageDirectoryStats struct {
+	Path string
+
+	// The approximate number of spans present in this shard.  This may be an
+	// underestimate.
+	ApproxNumSpans uint64
+
+	// leveldb.stats information
+	LevelDbStats string
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index ccb09e0..487762b 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -68,6 +68,13 @@ const HTRACE_LOG_PATH = "log.path"
 // The log level to use for the logs in htrace.
 const HTRACE_LOG_LEVEL = "log.level"
 
+// The period between metrics heartbeats.  This is the approximate interval at which we will
+// update global metrics.
+const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = "metrics.heartbeat.period.ms"
+
+// The maximum number of addresses for which we will maintain metrics.
+const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
+
 // A host:port pair to send information to on startup.  This is used in unit
 // tests to determine the (random) port of the htraced process that has been
 // started.
@@ -83,4 +90,15 @@ var DEFAULTS = map[string]string{
 	HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100",
 	HTRACE_LOG_PATH:                    "",
 	HTRACE_LOG_LEVEL:                   "INFO",
+	HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
+	HTRACE_METRICS_MAX_ADDR_ENTRIES:    "100000",
+}
+
+// Values to be used when creating test configurations
+func TEST_VALUES() map[string]string {
+	return map[string]string{
+		HTRACE_HRPC_ADDRESS: ":0",    // use a random port for the HRPC server
+		HTRACE_LOG_LEVEL:    "TRACE", // show all log messages in tests
+		HTRACE_WEB_ADDRESS:  ":0",    // use a random port for the REST server
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
index f6972bb..749acdf 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
@@ -31,6 +31,7 @@ import (
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
 	"os"
+	"sort"
 	"strings"
 	"time"
 )
@@ -196,15 +197,29 @@ func printServerStats(hcl *htrace.Client) int {
 		fmt.Println(err.Error())
 		return EXIT_FAILURE
 	}
-	fmt.Printf("HTraced server stats:\n")
-	fmt.Printf("%d leveldb shards.\n", len(stats.Shards))
-	for i := range stats.Shards {
-		shard := stats.Shards[i]
-		fmt.Printf("==== %s ===\n", shard.Path)
-		fmt.Printf("Approximate number of spans: %d\n", shard.ApproxNumSpans)
-		stats := strings.Replace(shard.LevelDbStats, "\\n", "\n", -1)
+	fmt.Printf("HTRACED SERVER STATS:\n")
+	fmt.Printf("%d leveldb directories.\n", len(stats.Dirs))
+	for i := range stats.Dirs {
+		dir := stats.Dirs[i]
+		fmt.Printf("==== %s ===\n", dir.Path)
+		fmt.Printf("Approximate number of spans: %d\n", dir.ApproxNumSpans)
+		stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
 		fmt.Printf("%s\n", stats)
 	}
+	fmt.Printf("HOST SPAN METRICS:\n")
+	mtxMap := stats.HostSpanMetrics
+	keys := make(sort.StringSlice, len(mtxMap))
+	i := 0
+	for k, _ := range mtxMap {
+		keys[i] = k
+		i++
+	}
+	sort.Sort(keys)
+	for k := range keys {
+		mtx := mtxMap[keys[k]]
+		fmt.Printf("%s: written: %d, server dropped %d, client dropped %d\n",
+			keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDropped)
+	}
 	return EXIT_SUCCESS
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index 540e688..ca0a425 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -210,7 +210,7 @@ const EXAMPLE_CONF_VALUE = "foo.bar.baz"
 
 func TestClientGetServerConf(t *testing.T) {
 	htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf",
-		Cnf: map[string]string {
+		Cnf: map[string]string{
 			EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE,
 		},
 		DataDirs: make([]string, 2)}
@@ -230,6 +230,6 @@ func TestClientGetServerConf(t *testing.T) {
 	}
 	if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE {
 		t.Fatalf("unexpected value for %s: %s",
-				EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
+			EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 9fb9920..780b6d2 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -31,7 +31,6 @@ import (
 	"os"
 	"strconv"
 	"strings"
-	"sync/atomic"
 )
 
 //
@@ -77,19 +76,12 @@ const DURATION_INDEX_PREFIX = 'd'
 const PARENT_ID_INDEX_PREFIX = 'p'
 const INVALID_INDEX_PREFIX = 0
 
-type Statistics struct {
-	NumSpansWritten uint64
-}
-
-func (stats *Statistics) IncrementWrittenSpans() {
-	atomic.AddUint64(&stats.NumSpansWritten, 1)
-}
+type IncomingSpan struct {
+	// The address that the span was sent from.
+	Addr string
 
-// Make a copy of the statistics structure, using atomic operations.
-func (stats *Statistics) Copy() *Statistics {
-	return &Statistics{
-		NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten),
-	}
+	// The span.
+	*common.Span
 }
 
 // A single directory containing a levelDB instance.
@@ -104,27 +96,48 @@ type shard struct {
 	path string
 
 	// Incoming requests to write Spans.
-	incoming chan *common.Span
+	incoming chan *IncomingSpan
+
+	// A channel for incoming heartbeats
+	heartbeats chan interface{}
 
 	// The channel we will send a bool to when we exit.
 	exited chan bool
+
+	// Per-address metrics
+	mtxMap ServerSpanMetricsMap
+
+	// The maximum number of metrics to allow in our map
+	maxMtx int
 }
 
 // Process incoming spans for a shard.
 func (shd *shard) processIncoming() {
 	lg := shd.store.lg
+	defer func() {
+		lg.Infof("Shard processor for %s exiting.\n", shd.path)
+		shd.exited <- true
+	}()
 	for {
-		span := <-shd.incoming
-		if span == nil {
-			lg.Infof("Shard processor for %s exiting.\n", shd.path)
-			shd.exited <- true
-			return
-		}
-		err := shd.writeSpan(span)
-		if err != nil {
-			lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error())
-		} else {
-			lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson())
+		select {
+		case span := <-shd.incoming:
+			if span == nil {
+				return
+			}
+			err := shd.writeSpan(span)
+			if err != nil {
+				lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error())
+			} else {
+				lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson())
+			}
+		case <-shd.heartbeats:
+			lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path)
+			mtxMap := make(ServerSpanMetricsMap)
+			for addr, mtx := range shd.mtxMap {
+				mtxMap[addr] = mtx.Clone()
+				mtx.Clear()
+			}
+			shd.store.msink.UpdateMetrics(mtxMap)
 		}
 	}
 }
@@ -150,15 +163,19 @@ func u64toSlice(val uint64) []byte {
 		byte(0xff & (val >> 0))}
 }
 
-func (shd *shard) writeSpan(span *common.Span) error {
+func (shd *shard) writeSpan(ispan *IncomingSpan) error {
 	batch := levigo.NewWriteBatch()
 	defer batch.Close()
 
 	// Add SpanData to batch.
 	spanDataBuf := new(bytes.Buffer)
 	spanDataEnc := gob.NewEncoder(spanDataBuf)
+	span := ispan.Span
 	err := spanDataEnc.Encode(span.SpanData)
 	if err != nil {
+		shd.store.lg.Errorf("Error encoding span %s: %s\n",
+			span.String(), err.Error())
+		shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg)
 		return err
 	}
 	primaryKey :=
@@ -185,9 +202,12 @@ func (shd *shard) writeSpan(span *common.Span) error {
 
 	err = shd.ldb.Write(shd.store.writeOpts, batch)
 	if err != nil {
+		shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n",
+			span.String(), shd.path, err.Error())
+		shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg)
 		return err
 	}
-	shd.store.stats.IncrementWrittenSpans()
+	shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
 	if shd.store.WrittenSpans != nil {
 		shd.store.WrittenSpans <- span
 	}
@@ -238,9 +258,6 @@ type dataStore struct {
 	// The shards which manage our LevelDB instances.
 	shards []*shard
 
-	// I/O statistics for all shards.
-	stats Statistics
-
 	// The read options to use for LevelDB.
 	readOpts *levigo.ReadOptions
 
@@ -250,6 +267,12 @@ type dataStore struct {
 	// If non-null, a channel we will send spans to once we finish writing them.  This is only used
 	// for testing.
 	WrittenSpans chan *common.Span
+
+	// The metrics sink.
+	msink *MetricsSink
+
+	// The heartbeater which periodically asks shards to update the MetricsSink.
+	hb *Heartbeater
 }
 
 func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) {
@@ -286,11 +309,24 @@ func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataSto
 		}
 		store.shards = append(store.shards, shd)
 	}
+	store.msink = NewMetricsSink(cnf)
 	for idx := range store.shards {
 		shd := store.shards[idx]
 		shd.exited = make(chan bool, 1)
+		shd.heartbeats = make(chan interface{}, 1)
+		shd.mtxMap = make(ServerSpanMetricsMap)
+		shd.maxMtx = store.msink.maxMtx
 		go shd.processIncoming()
 	}
+	store.hb = NewHeartbeater("DatastoreHeartbeater",
+		cnf.GetInt64(conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS), lg)
+	for shdIdx := range store.shards {
+		shd := store.shards[shdIdx]
+		store.hb.AddHeartbeatTarget(&HeartbeatTarget{
+			name:       fmt.Sprintf("shard(%s)", shd.path),
+			targetChan: shd.heartbeats,
+		})
+	}
 	return store, nil
 }
 
@@ -372,7 +408,7 @@ func CreateShard(store *dataStore, cnf *conf.Config, path string,
 	}
 	spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
 	shd = &shard{store: store, ldb: ldb, path: path,
-		incoming: make(chan *common.Span, spanBufferSize)}
+		incoming: make(chan *IncomingSpan, spanBufferSize)}
 	return shd, nil
 }
 
@@ -406,16 +442,24 @@ func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error {
 	return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes())
 }
 
-func (store *dataStore) GetStatistics() *Statistics {
-	return store.stats.Copy()
+func (store *dataStore) GetSpanMetrics() common.SpanMetricsMap {
+	return store.msink.AccessTotals()
 }
 
 // Close the DataStore.
 func (store *dataStore) Close() {
+	if store.hb != nil {
+		store.hb.Shutdown()
+		store.hb = nil
+	}
 	for idx := range store.shards {
 		store.shards[idx].Close()
 		store.shards[idx] = nil
 	}
+	if store.msink != nil {
+		store.msink.Shutdown()
+		store.msink = nil
+	}
 	if store.readOpts != nil {
 		store.readOpts.Close()
 		store.readOpts = nil
@@ -435,7 +479,7 @@ func (store *dataStore) getShardIndex(sid common.SpanId) int {
 	return int(sid.Hash32() % uint32(len(store.shards)))
 }
 
-func (store *dataStore) WriteSpan(span *common.Span) {
+func (store *dataStore) WriteSpan(span *IncomingSpan) {
 	store.shards[store.getShardIndex(span.Id)].incoming <- span
 }
 
@@ -954,11 +998,11 @@ func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error)
 
 func (store *dataStore) ServerStats() *common.ServerStats {
 	serverStats := common.ServerStats{
-		Shards: make([]common.ShardStats, len(store.shards)),
+		Dirs: make([]common.StorageDirectoryStats, len(store.shards)),
 	}
 	for shardIdx := range store.shards {
 		shard := store.shards[shardIdx]
-		serverStats.Shards[shardIdx].Path = shard.path
+		serverStats.Dirs[shardIdx].Path = shard.path
 		r := levigo.Range{
 			Start: append([]byte{SPAN_ID_INDEX_PREFIX},
 				common.INVALID_SPAN_ID.Val()...),
@@ -966,11 +1010,12 @@ func (store *dataStore) ServerStats() *common.ServerStats {
 				common.INVALID_SPAN_ID.Val()...),
 		}
 		vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
-		serverStats.Shards[shardIdx].ApproxNumSpans = vals[0]
-		serverStats.Shards[shardIdx].LevelDbStats =
+		serverStats.Dirs[shardIdx].ApproxNumSpans = vals[0]
+		serverStats.Dirs[shardIdx].LevelDbStats =
 			shard.ldb.PropertyValue("leveldb.stats")
 		store.lg.Infof("shard.ldb.PropertyValue(leveldb.stats)=%s\n",
 			shard.ldb.PropertyValue("leveldb.stats"))
 	}
+	serverStats.HostSpanMetrics = store.msink.AccessTotals()
 	return &serverStats
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 0caa509..50d2891 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -73,10 +73,13 @@ var SIMPLE_TEST_SPANS []common.Span = []common.Span{
 
 func createSpans(spans []common.Span, store *dataStore) {
 	for idx := range spans {
-		store.WriteSpan(&spans[idx])
+		store.WriteSpan(&IncomingSpan{
+			Addr: "127.0.0.1:1234",
+			Span: &spans[idx],
+		})
 	}
 	// Wait the spans to be created
-	for i := 0; i < 3; i++ {
+	for i := 0; i < len(spans); i++ {
 		<-store.WrittenSpans
 	}
 }
@@ -85,6 +88,9 @@ func createSpans(spans []common.Span, store *dataStore) {
 func TestDatastoreWriteAndRead(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
+		Cnf: map[string]string{
+			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+		},
 		WrittenSpans: make(chan *common.Span, 100)}
 	ht, err := htraceBld.Build()
 	if err != nil {
@@ -92,9 +98,13 @@ func TestDatastoreWriteAndRead(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
-		t.Fatal()
-	}
+
+	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+		"127.0.0.1:1234": &common.SpanMetrics{
+			Written: uint64(len(SIMPLE_TEST_SPANS)),
+		},
+	})
+
 	span := ht.Store.FindSpan(common.TestId("00000000000000000000000000000001"))
 	if span == nil {
 		t.Fatal()
@@ -147,6 +157,9 @@ func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
 func TestSimpleQuery(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
+		Cnf: map[string]string{
+			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+		},
 		WrittenSpans: make(chan *common.Span, 100)}
 	ht, err := htraceBld.Build()
 	if err != nil {
@@ -154,9 +167,12 @@ func TestSimpleQuery(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
-		t.Fatal()
-	}
+	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+		"127.0.0.1:1234": &common.SpanMetrics{
+			Written: uint64(len(SIMPLE_TEST_SPANS)),
+		},
+	})
+
 	testQuery(t, ht, &common.Query{
 		Predicates: []common.Predicate{
 			common.Predicate{
@@ -172,6 +188,9 @@ func TestSimpleQuery(t *testing.T) {
 func TestQueries2(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
+		Cnf: map[string]string{
+			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+		},
 		WrittenSpans: make(chan *common.Span, 100)}
 	ht, err := htraceBld.Build()
 	if err != nil {
@@ -179,9 +198,11 @@ func TestQueries2(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
-		t.Fatal()
-	}
+	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+		"127.0.0.1:1234": &common.SpanMetrics{
+			Written: uint64(len(SIMPLE_TEST_SPANS)),
+		},
+	})
 	testQuery(t, ht, &common.Query{
 		Predicates: []common.Predicate{
 			common.Predicate{
@@ -224,6 +245,9 @@ func TestQueries2(t *testing.T) {
 func TestQueries3(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
+		Cnf: map[string]string{
+			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+		},
 		WrittenSpans: make(chan *common.Span, 100)}
 	ht, err := htraceBld.Build()
 	if err != nil {
@@ -231,9 +255,11 @@ func TestQueries3(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
-		t.Fatal()
-	}
+	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+		"127.0.0.1:1234": &common.SpanMetrics{
+			Written: uint64(len(SIMPLE_TEST_SPANS)),
+		},
+	})
 	testQuery(t, ht, &common.Query{
 		Predicates: []common.Predicate{
 			common.Predicate{
@@ -276,6 +302,9 @@ func TestQueries3(t *testing.T) {
 func TestQueries4(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
+		Cnf: map[string]string{
+			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+		},
 		WrittenSpans: make(chan *common.Span, 100)}
 	ht, err := htraceBld.Build()
 	if err != nil {
@@ -283,9 +312,11 @@ func TestQueries4(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
-		t.Fatal()
-	}
+	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+		"127.0.0.1:1234": &common.SpanMetrics{
+			Written: uint64(len(SIMPLE_TEST_SPANS)),
+		},
+	})
 	testQuery(t, ht, &common.Query{
 		Predicates: []common.Predicate{
 			common.Predicate{
@@ -320,6 +351,9 @@ func TestQueries4(t *testing.T) {
 
 func BenchmarkDatastoreWrites(b *testing.B) {
 	htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
+		Cnf: map[string]string{
+			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+		},
 		WrittenSpans: make(chan *common.Span, b.N)}
 	ht, err := htraceBld.Build()
 	if err != nil {
@@ -331,22 +365,28 @@ func BenchmarkDatastoreWrites(b *testing.B) {
 	// Write many random spans.
 	for n := 0; n < b.N; n++ {
 		span := test.NewRandomSpan(rnd, allSpans[0:n])
-		ht.Store.WriteSpan(span)
+		ht.Store.WriteSpan(&IncomingSpan{
+			Addr: "127.0.0.1:1234",
+			Span: span,
+		})
 		allSpans[n] = span
 	}
 	// Wait for all the spans to be written.
 	for n := 0; n < b.N; n++ {
 		<-ht.Store.WrittenSpans
 	}
-	spansWritten := ht.Store.GetStatistics().NumSpansWritten
-	if spansWritten < uint64(b.N) {
-		b.Fatal("incorrect statistics: expected %d spans to be written, but only got %d",
-			b.N, spansWritten)
-	}
+	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+		"127.0.0.1:1234": &common.SpanMetrics{
+			Written: uint64(b.N), // should be less than?
+		},
+	})
 }
 
 func TestReloadDataStore(t *testing.T) {
 	htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
+		Cnf: map[string]string{
+			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+		},
 		DataDirs: make([]string, 2), KeepDataDirsOnClose: true}
 	ht, err := htraceBld.Build()
 	if err != nil {
@@ -444,6 +484,9 @@ func TestReloadDataStore(t *testing.T) {
 func TestQueriesWithContinuationTokens1(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1",
+		Cnf: map[string]string{
+			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+		},
 		WrittenSpans: make(chan *common.Span, 100)}
 	ht, err := htraceBld.Build()
 	if err != nil {
@@ -451,9 +494,11 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
-		t.Fatal()
-	}
+	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
+		"127.0.0.1:1234": &common.SpanMetrics{
+			Written: uint64(len(SIMPLE_TEST_SPANS)),
+		},
+	})
 	// Adding a prev value to this query excludes the first result that we
 	// would normally get.
 	testQuery(t, ht, &common.Query{

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
new file mode 100644
index 0000000..140b50d
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"org/apache/htrace/common"
+	"time"
+)
+
+type Heartbeater struct {
+	// The name of this heartbeater
+	name string
+
+	// How long to sleep between heartbeats, in milliseconds.
+	periodMs int64
+
+	// The logger to use.
+	lg *common.Logger
+
+	// The channels to send the heartbeat on.
+	targets []HeartbeatTarget
+
+	// Incoming requests to the heartbeater.  When this is closed, the
+	// heartbeater will exit.
+	req chan *HeartbeatTarget
+}
+
+type HeartbeatTarget struct {
+	// The name of the heartbeat target.
+	name string
+
+	// The channel for the heartbeat target.
+	targetChan chan interface{}
+}
+
+func (tgt *HeartbeatTarget) String() string {
+	return tgt.name
+}
+
+func NewHeartbeater(name string, periodMs int64, lg *common.Logger) *Heartbeater {
+	hb := &Heartbeater{
+		name:     name,
+		periodMs: periodMs,
+		lg:       lg,
+		targets:  make([]HeartbeatTarget, 0, 4),
+		req:      make(chan *HeartbeatTarget),
+	}
+	go hb.run()
+	return hb
+}
+
+func (hb *Heartbeater) AddHeartbeatTarget(tgt *HeartbeatTarget) {
+	hb.req <- tgt
+}
+
+func (hb *Heartbeater) Shutdown() {
+	close(hb.req)
+}
+
+func (hb *Heartbeater) String() string {
+	return hb.name
+}
+
+func (hb *Heartbeater) run() {
+	period := time.Duration(hb.periodMs) * time.Millisecond
+	for {
+		periodEnd := time.Now().Add(period)
+		for {
+			timeToWait := periodEnd.Sub(time.Now())
+			if timeToWait <= 0 {
+				break
+			} else if timeToWait > period {
+				// Smooth over jitter or clock changes
+				timeToWait = period
+				periodEnd = time.Now().Add(period)
+			}
+			select {
+			case tgt, open := <-hb.req:
+				if !open {
+					hb.lg.Debugf("%s: exiting.\n", hb.String())
+					return
+				}
+				hb.targets = append(hb.targets, *tgt)
+				hb.lg.Debugf("%s: added %s.\n", hb.String(), tgt.String())
+			case <-time.After(timeToWait):
+			}
+		}
+		for targetIdx := range hb.targets {
+			select {
+			case hb.targets[targetIdx].targetChan <- nil:
+			default:
+				// We failed to send a heartbeat because the other goroutine was busy and
+				// hasn't cleared the previous one from its channel.  This could indicate a
+				// stuck goroutine.
+				hb.lg.Infof("%s: could not send heartbeat to %s.\n",
+					hb.String(), hb.targets[targetIdx])
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
new file mode 100644
index 0000000..cbde7fc
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"org/apache/htrace/common"
+	"org/apache/htrace/conf"
+	"testing"
+	"time"
+)
+
+func TestHeartbeaterStartupShutdown(t *testing.T) {
+	cnfBld := conf.Builder{
+		Values:   conf.TEST_VALUES(),
+		Defaults: conf.DEFAULTS,
+	}
+	cnf, err := cnfBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create conf: %s", err.Error())
+	}
+	lg := common.NewLogger("heartbeater", cnf)
+	hb := NewHeartbeater("ExampleHeartbeater", 1, lg)
+	if hb.String() != "ExampleHeartbeater" {
+		t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
+	}
+	hb.Shutdown()
+}
+
+// The number of milliseconds between heartbeats
+const HEARTBEATER_PERIOD = 5
+
+// The number of heartbeats to send in the test.
+const NUM_TEST_HEARTBEATS = 3
+
+func TestHeartbeaterSendsHeartbeats(t *testing.T) {
+	cnfBld := conf.Builder{
+		Values:   conf.TEST_VALUES(),
+		Defaults: conf.DEFAULTS,
+	}
+	cnf, err := cnfBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create conf: %s", err.Error())
+	}
+	lg := common.NewLogger("heartbeater", cnf)
+	// The minimum amount of time which the heartbeater test should take
+	MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * HEARTBEATER_PERIOD)
+	duration := MINIMUM_TEST_DURATION
+	for duration <= MINIMUM_TEST_DURATION {
+		start := time.Now()
+		testHeartbeaterSendsHeartbeatsImpl(t, lg)
+		end := time.Now()
+		duration = end.Sub(start)
+		lg.Debugf("Measured duration: %v; minimum expected duration: %v\n",
+			duration, MINIMUM_TEST_DURATION)
+	}
+}
+
+func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) {
+	hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg)
+	if hb.String() != "ExampleHeartbeater" {
+		t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
+	}
+	testChan := make(chan interface{}, NUM_TEST_HEARTBEATS)
+	gotAllHeartbeats := make(chan bool)
+	hb.AddHeartbeatTarget(&HeartbeatTarget{
+		name:       "ExampleHeartbeatTarget",
+		targetChan: testChan,
+	})
+	go func() {
+		for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+			<-testChan
+		}
+		gotAllHeartbeats <- true
+		for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+			_, open := <-testChan
+			if !open {
+				return
+			}
+		}
+	}()
+	<-gotAllHeartbeats
+	hb.Shutdown()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index 354d064..49587bb 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -32,6 +32,7 @@ import (
 	"net/rpc"
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
+	"reflect"
 )
 
 // Handles HRPC calls
@@ -109,9 +110,10 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
 }
 
 func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+	remoteAddr := cdc.conn.RemoteAddr()
 	if cdc.lg.TraceEnabled() {
 		cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
-			cdc.length, cdc.conn.RemoteAddr())
+			cdc.length, remoteAddr)
 	}
 	mh := new(codec.MsgpackHandle)
 	mh.WriteExt = true
@@ -119,11 +121,16 @@ func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
 	err := dec.Decode(body)
 	if err != nil {
 		return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read request "+
-			"body from %s: %s", cdc.conn.RemoteAddr(), err.Error()))
+			"body from %s: %s", remoteAddr, err.Error()))
 	}
 	if cdc.lg.TraceEnabled() {
 		cdc.lg.Tracef("Read body from %s: %s\n",
-			cdc.conn.RemoteAddr(), asJson(&body))
+			remoteAddr, asJson(&body))
+	}
+	val := reflect.ValueOf(body)
+	addr := val.Elem().FieldByName("Addr")
+	if addr.IsValid() {
+		addr.SetString(remoteAddr.String())
 	}
 	return nil
 }
@@ -203,7 +210,10 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
 		if hand.lg.TraceEnabled() {
 			hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
 		}
-		hand.store.WriteSpan(span)
+		hand.store.WriteSpan(&IncomingSpan{
+			Addr: req.Addr,
+			Span: span,
+		})
 	}
 	return nil
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
new file mode 100644
index 0000000..672f5f6
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"encoding/json"
+	"org/apache/htrace/common"
+	"org/apache/htrace/conf"
+	"sync"
+)
+
+//
+// The Metrics Sink for HTraced.
+//
+// The Metrics sink keeps track of metrics for the htraced daemon.
+// It is important to have good metrics so that we can properly manager htraced.  In particular, we
+// need to know what rate we are receiving spans at, the main places spans came from.  If spans
+// were dropped because of a high sampling rates, we need to know which part of the system dropped
+// them so that we can adjust the sampling rate there.
+//
+
+type ServerSpanMetrics struct {
+	// The total number of spans written to HTraced.
+	Written uint64
+
+	// The total number of spans dropped by the server.
+	ServerDropped uint64
+}
+
+func (spm *ServerSpanMetrics) Clone() *ServerSpanMetrics {
+	return &ServerSpanMetrics{
+		Written:       spm.Written,
+		ServerDropped: spm.ServerDropped,
+	}
+}
+
+func (spm *ServerSpanMetrics) String() string {
+	jbytes, err := json.Marshal(*spm)
+	if err != nil {
+		panic(err)
+	}
+	return string(jbytes)
+}
+
+func (spm *ServerSpanMetrics) Add(ospm *ServerSpanMetrics) {
+	spm.Written += ospm.Written
+	spm.ServerDropped += ospm.ServerDropped
+}
+
+func (spm *ServerSpanMetrics) Clear() {
+	spm.Written = 0
+	spm.ServerDropped = 0
+}
+
+// A map from network address strings to ServerSpanMetrics structures.
+type ServerSpanMetricsMap map[string]*ServerSpanMetrics
+
+func (smtxMap ServerSpanMetricsMap) IncrementDropped(addr string, maxMtx int,
+	lg *common.Logger) {
+	mtx := smtxMap[addr]
+	if mtx == nil {
+		mtx = &ServerSpanMetrics{}
+		smtxMap[addr] = mtx
+	}
+	mtx.ServerDropped++
+	smtxMap.Prune(maxMtx, lg)
+}
+
+func (smtxMap ServerSpanMetricsMap) IncrementWritten(addr string, maxMtx int,
+	lg *common.Logger) {
+	mtx := smtxMap[addr]
+	if mtx == nil {
+		mtx = &ServerSpanMetrics{}
+		smtxMap[addr] = mtx
+	}
+	mtx.Written++
+	smtxMap.Prune(maxMtx, lg)
+}
+
+func (smtxMap ServerSpanMetricsMap) Prune(maxMtx int, lg *common.Logger) {
+	if len(smtxMap) >= maxMtx {
+		// Delete a random entry
+		for k := range smtxMap {
+			lg.Warnf("Evicting metrics entry for addr %s "+
+				"because there are more than %d addrs.\n", k, maxMtx)
+			delete(smtxMap, k)
+			return
+		}
+	}
+}
+
+type AccessReq struct {
+	mtxMap common.SpanMetricsMap
+	done   chan interface{}
+}
+
+type MetricsSink struct {
+	// The total span metrics.
+	smtxMap ServerSpanMetricsMap
+
+	// A channel of incoming shard metrics.
+	// When this is shut down, the MetricsSink will exit.
+	updateReqs chan ServerSpanMetricsMap
+
+	// A channel of incoming requests for shard metrics.
+	accessReqs chan *AccessReq
+
+	// This will be closed when the MetricsSink has exited.
+	exited chan interface{}
+
+	// The logger used by this MetricsSink.
+	lg *common.Logger
+
+	// The maximum number of metrics totals we will maintain.
+	maxMtx int
+
+	// The number of spans which each client has self-reported that it has
+	// dropped.
+	clientDroppedMap map[string]uint64
+
+	// Lock protecting clientDropped
+	clientDroppedLock sync.Mutex
+}
+
+func NewMetricsSink(cnf *conf.Config) *MetricsSink {
+	mcl := MetricsSink{
+		smtxMap:          make(ServerSpanMetricsMap),
+		updateReqs:       make(chan ServerSpanMetricsMap, 128),
+		accessReqs:       make(chan *AccessReq),
+		exited:           make(chan interface{}),
+		lg:               common.NewLogger("metrics", cnf),
+		maxMtx:           cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
+		clientDroppedMap: make(map[string]uint64),
+	}
+	go mcl.run()
+	return &mcl
+}
+
+func (msink *MetricsSink) run() {
+	lg := msink.lg
+	defer func() {
+		lg.Info("MetricsSink: stopping service goroutine.\n")
+		close(msink.exited)
+	}()
+	lg.Tracef("MetricsSink: starting.\n")
+	for {
+		select {
+		case updateReq, open := <-msink.updateReqs:
+			if !open {
+				lg.Trace("MetricsSink: shutting down cleanly.\n")
+				return
+			}
+			for addr, umtx := range updateReq {
+				smtx := msink.smtxMap[addr]
+				if smtx == nil {
+					smtx = &ServerSpanMetrics{}
+					msink.smtxMap[addr] = smtx
+				}
+				smtx.Add(umtx)
+				if lg.TraceEnabled() {
+					lg.Tracef("MetricsSink: updated %s to %s\n", addr, smtx.String())
+				}
+			}
+			msink.smtxMap.Prune(msink.maxMtx, lg)
+		case accessReq := <-msink.accessReqs:
+			msink.handleAccessReq(accessReq)
+		}
+	}
+}
+
+func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) {
+	msink.lg.Debug("MetricsSink: accessing global metrics.\n")
+	msink.clientDroppedLock.Lock()
+	defer func() {
+		msink.clientDroppedLock.Unlock()
+		close(accessReq.done)
+	}()
+	for addr, smtx := range msink.smtxMap {
+		accessReq.mtxMap[addr] = &common.SpanMetrics{
+			Written:       smtx.Written,
+			ServerDropped: smtx.ServerDropped,
+			ClientDropped: msink.clientDroppedMap[addr],
+		}
+	}
+}
+
+func (msink *MetricsSink) AccessTotals() common.SpanMetricsMap {
+	accessReq := &AccessReq{
+		mtxMap: make(common.SpanMetricsMap),
+		done:   make(chan interface{}),
+	}
+	msink.accessReqs <- accessReq
+	<-accessReq.done
+	return accessReq.mtxMap
+}
+
+func (msink *MetricsSink) UpdateMetrics(mtxMap ServerSpanMetricsMap) {
+	msink.updateReqs <- mtxMap
+}
+
+func (msink *MetricsSink) Shutdown() {
+	close(msink.updateReqs)
+	<-msink.exited
+}
+
+func (msink *MetricsSink) UpdateClientDropped(client string, clientDropped uint64) {
+	msink.clientDroppedLock.Lock()
+	defer msink.clientDroppedLock.Unlock()
+	msink.clientDroppedMap[client] = clientDropped
+	if len(msink.clientDroppedMap) >= msink.maxMtx {
+		// Delete a random entry
+		for k := range msink.clientDroppedMap {
+			delete(msink.clientDroppedMap, k)
+			return
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
new file mode 100644
index 0000000..c90d1da
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"org/apache/htrace/common"
+	"org/apache/htrace/conf"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestMetricsSinkStartupShutdown(t *testing.T) {
+	cnfBld := conf.Builder{
+		Values:   conf.TEST_VALUES(),
+		Defaults: conf.DEFAULTS,
+	}
+	cnf, err := cnfBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create conf: %s", err.Error())
+	}
+	msink := NewMetricsSink(cnf)
+	msink.Shutdown()
+}
+
+func TestAddSpanMetrics(t *testing.T) {
+	a := &ServerSpanMetrics{
+		Written:       100,
+		ServerDropped: 200,
+	}
+	b := &ServerSpanMetrics{
+		Written:       500,
+		ServerDropped: 100,
+	}
+	a.Add(b)
+	if a.Written != 600 {
+		t.Fatalf("SpanMetrics#Add failed to update #Written")
+	}
+	if a.ServerDropped != 300 {
+		t.Fatalf("SpanMetrics#Add failed to update #Dropped")
+	}
+	if b.Written != 500 {
+		t.Fatalf("SpanMetrics#Add updated b#Written")
+	}
+	if b.ServerDropped != 100 {
+		t.Fatalf("SpanMetrics#Add updated b#Dropped")
+	}
+}
+
+func compareTotals(a, b common.SpanMetricsMap) bool {
+	for k, v := range a {
+		if !reflect.DeepEqual(v, b[k]) {
+			return false
+		}
+	}
+	for k, v := range b {
+		if !reflect.DeepEqual(v, a[k]) {
+			return false
+		}
+	}
+	return true
+}
+
+func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) {
+	for {
+		time.Sleep(1 * time.Millisecond)
+		totals := msink.AccessTotals()
+		if compareTotals(totals, expectedTotals) {
+			return
+		}
+	}
+}
+
+func TestMetricsSinkMessages(t *testing.T) {
+	cnfBld := conf.Builder{
+		Values:   conf.TEST_VALUES(),
+		Defaults: conf.DEFAULTS,
+	}
+	cnf, err := cnfBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create conf: %s", err.Error())
+	}
+	msink := NewMetricsSink(cnf)
+	totals := msink.AccessTotals()
+	if len(totals) != 0 {
+		t.Fatalf("Expected no data in the MetricsSink to start with.")
+	}
+	msink.UpdateMetrics(ServerSpanMetricsMap{
+		"192.168.0.100": &ServerSpanMetrics{
+			Written:       20,
+			ServerDropped: 10,
+		},
+	})
+	waitForMetrics(msink, common.SpanMetricsMap{
+		"192.168.0.100": &common.SpanMetrics{
+			Written:       20,
+			ServerDropped: 10,
+		},
+	})
+	msink.UpdateMetrics(ServerSpanMetricsMap{
+		"192.168.0.100": &ServerSpanMetrics{
+			Written:       200,
+			ServerDropped: 100,
+		},
+	})
+	msink.UpdateMetrics(ServerSpanMetricsMap{
+		"192.168.0.100": &ServerSpanMetrics{
+			Written:       1000,
+			ServerDropped: 1000,
+		},
+	})
+	waitForMetrics(msink, common.SpanMetricsMap{
+		"192.168.0.100": &common.SpanMetrics{
+			Written:       1220,
+			ServerDropped: 1110,
+		},
+	})
+	msink.UpdateMetrics(ServerSpanMetricsMap{
+		"192.168.0.200": &ServerSpanMetrics{
+			Written:       200,
+			ServerDropped: 100,
+		},
+	})
+	waitForMetrics(msink, common.SpanMetricsMap{
+		"192.168.0.100": &common.SpanMetrics{
+			Written:       1220,
+			ServerDropped: 1110,
+		},
+		"192.168.0.200": &common.SpanMetrics{
+			Written:       200,
+			ServerDropped: 100,
+		},
+	})
+	msink.Shutdown()
+}
+
+func TestMetricsSinkMessagesEviction(t *testing.T) {
+	cnfBld := conf.Builder{
+		Values:   conf.TEST_VALUES(),
+		Defaults: conf.DEFAULTS,
+	}
+	cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
+	cnfBld.Values[conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS] = "1"
+	cnf, err := cnfBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create conf: %s", err.Error())
+	}
+	msink := NewMetricsSink(cnf)
+	msink.UpdateMetrics(ServerSpanMetricsMap{
+		"192.168.0.100": &ServerSpanMetrics{
+			Written:       20,
+			ServerDropped: 10,
+		},
+		"192.168.0.101": &ServerSpanMetrics{
+			Written:       20,
+			ServerDropped: 10,
+		},
+		"192.168.0.102": &ServerSpanMetrics{
+			Written:       20,
+			ServerDropped: 10,
+		},
+	})
+	for {
+		totals := msink.AccessTotals()
+		if len(totals) == 2 {
+			break
+		}
+	}
+	msink.Shutdown()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index a54f2cb..c2300c4 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -90,11 +90,15 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) {
 			}
 		}
 	}
+	// Copy the default test configuration values.
+	for k, v := range conf.TEST_VALUES() {
+		_, hasVal := bld.Cnf[k]
+		if !hasVal {
+			bld.Cnf[k] = v
+		}
+	}
 	bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
 		strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
-	bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0"  // use a random port for the REST server
-	bld.Cnf[conf.HTRACE_HRPC_ADDRESS] = ":0" // use a random port for the HRPC server
-	bld.Cnf[conf.HTRACE_LOG_LEVEL] = "TRACE"
 	cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
 	cnf, err := cnfBld.Build()
 	if err != nil {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c28fc60d/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 16c3a75..eca3f08 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -90,7 +90,7 @@ func (hand *serverStatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
 
 type serverConfHandler struct {
 	cnf *conf.Config
-	lg *common.Logger
+	lg  *common.Logger
 }
 
 func (hand *serverConfHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -233,9 +233,13 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
 		if spanIdProblem != "" {
 			hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", spanIdProblem))
 		} else {
-			hand.store.WriteSpan(span)
+			hand.store.WriteSpan(&IncomingSpan{
+				Addr: req.RemoteAddr,
+				Span: span,
+			})
 		}
 	}
+	hand.store.msink.UpdateClientDropped(req.RemoteAddr, msg.ClientDropped)
 }
 
 type queryHandler struct {


Mime
View raw message