htrace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iwasak...@apache.org
Subject [3/7] incubator-htrace git commit: HTRACE-357. Rename htrace-htraced/go/src/org/apache/htrace to htrace-htraced/go/src/htrace (Colin Patrick McCabe via iwasakims)
Date Tue, 19 Apr 2016 23:32:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
deleted file mode 100644
index 11e2733..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package common
-
-import (
-	"testing"
-)
-
-func testRoundTrip(t *testing.T, u int64) {
-	tme := UnixMsToTime(u)
-	u2 := TimeToUnixMs(tme)
-	if u2 != u {
-		t.Fatalf("Error taking %d on a round trip: came back as "+
-			"%d instead.\n", u, u2)
-	}
-}
-
-func TestTimeConversions(t *testing.T) {
-	testRoundTrip(t, 0)
-	testRoundTrip(t, 1445540632000)
-}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config.go b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
deleted file mode 100644
index 24170b2..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config.go
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package conf
-
-import (
-	"bufio"
-	"bytes"
-	"fmt"
-	"io"
-	"log"
-	"os"
-	"path/filepath"
-	"sort"
-	"strconv"
-	"strings"
-	"syscall"
-)
-
-//
-// The configuration code for HTraced.
-//
-// HTraced can be configured via Hadoop-style XML configuration files, or by passing -Dkey=value
-// command line arguments.  Command-line arguments without an equals sign, such as "-Dkey", will be
-// treated as setting the key to "true".
-//
-// Configuration key constants should be defined in config_keys.go.  Each key should have a default,
-// which will be used if the user supplies no value, or supplies an invalid value.
-// For that reason, it is not necessary for the Get, GetInt, etc. functions to take a default value
-// argument.
-//
-// Configuration objects are immutable.  However, you can make a copy of a configuration which adds
-// some changes using Configuration#Clone().
-//
-
-type Config struct {
-	settings map[string]string
-	defaults map[string]string
-}
-
-type Builder struct {
-	// If non-nil, the XML configuration file to read.
-	Reader io.Reader
-
-	// If non-nil, the configuration values to use.
-	Values map[string]string
-
-	// If non-nil, the default configuration values to use.
-	Defaults map[string]string
-
-	// If non-nil, the command-line arguments to use.
-	Argv []string
-
-	// The name of the application.  Configuration keys that start with this
-	// string will be converted to their unprefixed forms.
-	AppPrefix string
-}
-
-func getDefaultHTracedConfDir() string {
-	return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf"
-}
-
-func getHTracedConfDirs(dlog io.Writer) []string {
-	confDir := os.Getenv("HTRACED_CONF_DIR")
-	paths := filepath.SplitList(confDir)
-	if len(paths) < 1 {
-		def := getDefaultHTracedConfDir()
-		io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR defaulting to %s\n", def))
-		return []string{def}
-	}
-	io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR=%s\n", confDir))
-	return paths
-}
-
-// Load a configuration from the application's argv, configuration file, and the standard
-// defaults.
-func LoadApplicationConfig(appPrefix string) (*Config, io.Reader) {
-	dlog := new(bytes.Buffer)
-	reader := openFile(CONFIG_FILE_NAME, getHTracedConfDirs(dlog), dlog)
-	bld := Builder{}
-	if reader != nil {
-		defer reader.Close()
-		bld.Reader = bufio.NewReader(reader)
-	}
-	bld.Argv = os.Args[1:]
-	bld.Defaults = DEFAULTS
-	bld.AppPrefix = appPrefix
-	cnf, err := bld.Build()
-	if err != nil {
-		log.Fatal("Error building configuration: " + err.Error())
-	}
-	os.Args = append(os.Args[0:1], bld.Argv...)
-	keys := make(sort.StringSlice, 0, 20)
-	for k, _ := range cnf.settings {
-		keys = append(keys, k)
-	}
-	sort.Sort(keys)
-	prefix := ""
-	io.WriteString(dlog, "Read configuration: ")
-	for i := range keys {
-		io.WriteString(dlog, fmt.Sprintf(`%s%s = "%s"`,
-			prefix, keys[i], cnf.settings[keys[i]]))
-		prefix = ", "
-	}
-	return cnf, dlog
-}
-
-// Attempt to open a configuration file somewhere on the provided list of paths.
-func openFile(cnfName string, paths []string, dlog io.Writer) io.ReadCloser {
-	for p := range paths {
-		path := fmt.Sprintf("%s%c%s", paths[p], os.PathSeparator, cnfName)
-		file, err := os.Open(path)
-		if err == nil {
-			io.WriteString(dlog, fmt.Sprintf("Reading configuration from %s.\n", path))
-			return file
-		}
-		if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOENT {
-			continue
-		}
-		io.WriteString(dlog, fmt.Sprintf("Error opening %s for read: %s\n", path, err.Error()))
-	}
-	return nil
-}
-
-// Try to parse a command-line element as a key=value pair.
-func parseAsConfigFlag(flag string) (string, string) {
-	var confPart string
-	if strings.HasPrefix(flag, "-D") {
-		confPart = flag[2:]
-	} else if strings.HasPrefix(flag, "--D") {
-		confPart = flag[3:]
-	} else {
-		return "", ""
-	}
-	if len(confPart) == 0 {
-		return "", ""
-	}
-	idx := strings.Index(confPart, "=")
-	if idx == -1 {
-		return confPart, "true"
-	}
-	return confPart[0:idx], confPart[idx+1:]
-}
-
-// Build a new configuration object from the provided conf.Builder.
-func (bld *Builder) Build() (*Config, error) {
-	// Load values and defaults
-	cnf := Config{}
-	cnf.settings = make(map[string]string)
-	if bld.Values != nil {
-		for k, v := range bld.Values {
-			cnf.settings[k] = v
-		}
-	}
-	cnf.defaults = make(map[string]string)
-	if bld.Defaults != nil {
-		for k, v := range bld.Defaults {
-			cnf.defaults[k] = v
-		}
-	}
-
-	// Process the configuration file, if we have one
-	if bld.Reader != nil {
-		parseXml(bld.Reader, cnf.settings)
-	}
-
-	// Process command line arguments
-	var i int
-	for i < len(bld.Argv) {
-		str := bld.Argv[i]
-		key, val := parseAsConfigFlag(str)
-		if key != "" {
-			cnf.settings[key] = val
-			bld.Argv = append(bld.Argv[:i], bld.Argv[i+1:]...)
-		} else {
-			i++
-		}
-	}
-	cnf.settings = bld.removeApplicationPrefixes(cnf.settings)
-	cnf.defaults = bld.removeApplicationPrefixes(cnf.defaults)
-	return &cnf, nil
-}
-
-func (bld *Builder) removeApplicationPrefixes(in map[string]string) map[string]string {
-	out := make(map[string]string)
-	for k, v := range in {
-		if strings.HasPrefix(k, bld.AppPrefix) {
-			out[k[len(bld.AppPrefix):]] = v
-		} else {
-			out[k] = v
-		}
-	}
-	return out
-}
-
-// Returns true if the configuration has a non-default value for the given key.
-func (cnf *Config) Contains(key string) bool {
-	_, ok := cnf.settings[key]
-	return ok
-}
-
-// Get a string configuration key.
-func (cnf *Config) Get(key string) string {
-	ret, hadKey := cnf.settings[key]
-	if hadKey {
-		return ret
-	}
-	return cnf.defaults[key]
-}
-
-// Get a boolean configuration key.
-func (cnf *Config) GetBool(key string) bool {
-	str := cnf.settings[key]
-	ret, err := strconv.ParseBool(str)
-	if err == nil {
-		return ret
-	}
-	str = cnf.defaults[key]
-	ret, err = strconv.ParseBool(str)
-	if err == nil {
-		return ret
-	}
-	return false
-}
-
-// Get an integer configuration key.
-func (cnf *Config) GetInt(key string) int {
-	str := cnf.settings[key]
-	ret, err := strconv.Atoi(str)
-	if err == nil {
-		return ret
-	}
-	str = cnf.defaults[key]
-	ret, err = strconv.Atoi(str)
-	if err == nil {
-		return ret
-	}
-	return 0
-}
-
-// Get an int64 configuration key.
-func (cnf *Config) GetInt64(key string) int64 {
-	str := cnf.settings[key]
-	ret, err := strconv.ParseInt(str, 10, 64)
-	if err == nil {
-		return ret
-	}
-	str = cnf.defaults[key]
-	ret, err = strconv.ParseInt(str, 10, 64)
-	if err == nil {
-		return ret
-	}
-	return 0
-}
-
-// Make a deep copy of the given configuration.
-// Optionally, you can specify particular key/value pairs to change.
-// Example:
-// cnf2 := cnf.Copy("my.changed.key", "my.new.value")
-func (cnf *Config) Clone(args ...string) *Config {
-	if len(args)%2 != 0 {
-		panic("The arguments to Config#copy are key1, value1, " +
-			"key2, value2, and so on.  You must specify an even number of arguments.")
-	}
-	ncnf := &Config{defaults: cnf.defaults}
-	ncnf.settings = make(map[string]string)
-	for k, v := range cnf.settings {
-		ncnf.settings[k] = v
-	}
-	for i := 0; i < len(args); i += 2 {
-		ncnf.settings[args[i]] = args[i+1]
-	}
-	return ncnf
-}
-
-// Export the configuration as a map
-func (cnf *Config) Export() map[string]string {
-	m := make(map[string]string)
-	for k, v := range cnf.defaults {
-		m[k] = v
-	}
-	for k, v := range cnf.settings {
-		m[k] = v
-	}
-	return m
-}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
deleted file mode 100644
index 16790d8..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package conf
-
-import (
-	"fmt"
-	"os"
-)
-
-//
-// Configuration keys for HTrace.
-//
-
-// The platform-specific path separator.  Usually slash.
-var PATH_SEP string = fmt.Sprintf("%c", os.PathSeparator)
-
-// The platform-specific path list separator.  Usually colon.
-var PATH_LIST_SEP string = fmt.Sprintf("%c", os.PathListSeparator)
-
-// The name of the XML configuration file to look for.
-const CONFIG_FILE_NAME = "htraced-conf.xml"
-
-// An environment variable containing a list of paths to search for the
-// configuration file in.
-const HTRACED_CONF_DIR = "HTRACED_CONF_DIR"
-
-// The web address to start the REST server on.
-const HTRACE_WEB_ADDRESS = "web.address"
-
-// The default port for the Htrace web address.
-const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9096
-
-// The web address to start the REST server on.
-const HTRACE_HRPC_ADDRESS = "hrpc.address"
-
-// The default port for the Htrace HRPC address.
-const HTRACE_HRPC_ADDRESS_DEFAULT_PORT = 9075
-
-// The directories to put the data store into.  Separated by PATH_LIST_SEP.
-const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories"
-
-// Boolean key which indicates whether we should clear data on startup.
-const HTRACE_DATA_STORE_CLEAR = "data.store.clear"
-
-// How many writes to buffer before applying backpressure to span senders.
-const HTRACE_DATA_STORE_SPAN_BUFFER_SIZE = "data.store.span.buffer.size"
-
-// Path to put the logs from htrace, or the empty string to use stdout.
-const HTRACE_LOG_PATH = "log.path"
-
-// The log level to use for the logs in htrace.
-const HTRACE_LOG_LEVEL = "log.level"
-
-// The period between datastore heartbeats.  This is the approximate interval at which we will
-// prune expired spans.
-const HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS = "datastore.heartbeat.period.ms"
-
-// The maximum number of addresses for which we will maintain metrics.
-const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
-
-// The number of milliseconds we should keep spans before discarding them.
-const HTRACE_SPAN_EXPIRY_MS = "span.expiry.ms"
-
-// The period between updates to the span reaper
-const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms"
-
-// A host:port pair to send information to on startup.  This is used in unit
-// tests to determine the (random) port of the htraced process that has been
-// started.
-const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address"
-
-// The maximum number of HRPC handler goroutines we will create at once.  If
-// this is too small, we won't get enough concurrency; if it's too big, we will
-// buffer too much data in memory while waiting for the datastore to process
-// requests.
-const HTRACE_NUM_HRPC_HANDLERS = "num.hrpc.handlers"
-
-// The I/O timeout HRPC will use, in milliseconds.  If it takes longer than
-// this to read or write a message, we will abort the connection.
-const HTRACE_HRPC_IO_TIMEOUT_MS = "hrpc.io.timeout.ms"
-
-// The leveldb write buffer size, or 0 to use the library default, which is 4
-// MB in leveldb 1.16.  See leveldb's options.h for more details.
-const HTRACE_LEVELDB_WRITE_BUFFER_SIZE = "leveldb.write.buffer.size"
-
-// The LRU cache size for leveldb, in bytes.
-const HTRACE_LEVELDB_CACHE_SIZE = "leveldb.cache.size"
-
-// Default values for HTrace configuration keys.
-var DEFAULTS = map[string]string{
-	HTRACE_WEB_ADDRESS:  fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
-	HTRACE_HRPC_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_HRPC_ADDRESS_DEFAULT_PORT),
-	HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" +
-		PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2",
-	HTRACE_DATA_STORE_CLEAR:              "false",
-	HTRACE_DATA_STORE_SPAN_BUFFER_SIZE:   "100",
-	HTRACE_LOG_PATH:                      "",
-	HTRACE_LOG_LEVEL:                     "INFO",
-	HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
-	HTRACE_METRICS_MAX_ADDR_ENTRIES:      "100000",
-	HTRACE_SPAN_EXPIRY_MS:                "0",
-	HTRACE_REAPER_HEARTBEAT_PERIOD_MS:    fmt.Sprintf("%d", 90*1000),
-	HTRACE_NUM_HRPC_HANDLERS:             "20",
-	HTRACE_HRPC_IO_TIMEOUT_MS:            "60000",
-	HTRACE_LEVELDB_WRITE_BUFFER_SIZE:     "0",
-	HTRACE_LEVELDB_CACHE_SIZE:            fmt.Sprintf("%d", 100 * 1024 * 1024),
-}
-
-// Values to be used when creating test configurations
-func TEST_VALUES() map[string]string {
-	return map[string]string{
-		HTRACE_HRPC_ADDRESS:   ":0",    // use a random port for the HRPC server
-		HTRACE_LOG_LEVEL:      "TRACE", // show all log messages in tests
-		HTRACE_WEB_ADDRESS:    ":0",    // use a random port for the REST server
-		HTRACE_SPAN_EXPIRY_MS: "0",     // never time out spans (unless testing the reaper)
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go
deleted file mode 100644
index a681136..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package conf
-
-import (
-	"bytes"
-	"os"
-	"strings"
-	"testing"
-)
-
-// Test that parsing command-line arguments of the form -Dfoo=bar works.
-func TestParseArgV(t *testing.T) {
-	t.Parallel()
-	argv := []string{"-Dfoo=bar", "-Dbaz=123", "-DsillyMode", "-Dlog.path="}
-	bld := &Builder{Argv: argv,
-		Defaults:map[string]string {
-			"log.path": "/log/path/default",
-		}}
-	cnf, err := bld.Build()
-	if err != nil {
-		t.Fatal()
-	}
-	if "bar" != cnf.Get("foo") {
-		t.Fatal()
-	}
-	if 123 != cnf.GetInt("baz") {
-		t.Fatal()
-	}
-	if !cnf.GetBool("sillyMode") {
-		t.Fatal()
-	}
-	if cnf.GetBool("otherSillyMode") {
-		t.Fatal()
-	}
-	if "" != cnf.Get("log.path") {
-		t.Fatal()
-	}
-}
-
-// Test that default values work.
-// Defaults are used only when the configuration option is not present or can't be parsed.
-func TestDefaults(t *testing.T) {
-	t.Parallel()
-	argv := []string{"-Dfoo=bar", "-Dbaz=invalidNumber"}
-	defaults := map[string]string{
-		"foo":  "notbar",
-		"baz":  "456",
-		"foo2": "4611686018427387904",
-	}
-	bld := &Builder{Argv: argv, Defaults: defaults}
-	cnf, err := bld.Build()
-	if err != nil {
-		t.Fatal()
-	}
-	if "bar" != cnf.Get("foo") {
-		t.Fatal()
-	}
-	if 456 != cnf.GetInt("baz") {
-		t.Fatal()
-	}
-	if 4611686018427387904 != cnf.GetInt64("foo2") {
-		t.Fatal()
-	}
-}
-
-// Test that we can parse our XML configuration file.
-func TestXmlConfigurationFile(t *testing.T) {
-	t.Parallel()
-	xml := `
-<?xml version="1.0"?>
-<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>
-<configuration>
-  <property>
-    <name>foo.bar</name>
-    <value>123</value>
-  </property>
-  <property>
-    <name>foo.baz</name>
-    <value>xmlValue</value>
-  </property>
-  <!--<property>
-    <name>commented.out</name>
-    <value>stuff</value>
-  </property>-->
-</configuration>
-`
-	xmlReader := strings.NewReader(xml)
-	argv := []string{"-Dfoo.bar=456"}
-	defaults := map[string]string{
-		"foo.bar":     "789",
-		"cmdline.opt": "4611686018427387904",
-	}
-	bld := &Builder{Argv: argv, Defaults: defaults, Reader: xmlReader}
-	cnf, err := bld.Build()
-	if err != nil {
-		t.Fatal()
-	}
-	// The command-line argument takes precedence over the XML and the defaults.
-	if 456 != cnf.GetInt("foo.bar") {
-		t.Fatal()
-	}
-	if "xmlValue" != cnf.Get("foo.baz") {
-		t.Fatalf("foo.baz = %s", cnf.Get("foo.baz"))
-	}
-	if "" != cnf.Get("commented.out") {
-		t.Fatal()
-	}
-	if 4611686018427387904 != cnf.GetInt64("cmdline.opt") {
-		t.Fatal()
-	}
-}
-
-// Test our handling of the HTRACE_CONF_DIR environment variable.
-func TestGetHTracedConfDirs(t *testing.T) {
-	os.Setenv("HTRACED_CONF_DIR", "")
-	dlog := new(bytes.Buffer)
-	dirs := getHTracedConfDirs(dlog)
-	if len(dirs) != 1 || dirs[0] != getDefaultHTracedConfDir() {
-		t.Fatal()
-	}
-	os.Setenv("HTRACED_CONF_DIR", "/foo/bar:/baz")
-	dirs = getHTracedConfDirs(dlog)
-	if len(dirs) != 2 || dirs[0] != "/foo/bar" || dirs[1] != "/baz" {
-		t.Fatal()
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/xml.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/xml.go b/htrace-htraced/go/src/org/apache/htrace/conf/xml.go
deleted file mode 100644
index de14bc5..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/conf/xml.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package conf
-
-import (
-	"encoding/xml"
-	"io"
-	"log"
-)
-
-type configuration struct {
-	Properties []propertyXml `xml:"property"`
-}
-
-type propertyXml struct {
-	Name  string `xml:"name"`
-	Value string `xml:"value"`
-}
-
-// Parse an XML configuration file.
-func parseXml(reader io.Reader, m map[string]string) error {
-	dec := xml.NewDecoder(reader)
-	configurationXml := configuration{}
-	err := dec.Decode(&configurationXml)
-	if err != nil {
-		return err
-	}
-	props := configurationXml.Properties
-	for p := range props {
-		key := props[p].Name
-		value := props[p].Value
-		if key == "" {
-			log.Println("Warning: ignoring element with missing or empty <name>.")
-			continue
-		}
-		if value == "" {
-			log.Println("Warning: ignoring element with key " + key + " with missing or empty <value>.")
-			continue
-		}
-		//log.Printf("setting %s to %s\n", key, value)
-		m[key] = value
-	}
-	return nil
-}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
deleted file mode 100644
index 7b64914..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
-	"fmt"
-	"github.com/ugorji/go/codec"
-	"math"
-	"math/rand"
-	htrace "org/apache/htrace/client"
-	"org/apache/htrace/common"
-	"org/apache/htrace/conf"
-	"org/apache/htrace/test"
-	"sort"
-	"sync"
-	"sync/atomic"
-	"testing"
-	"time"
-)
-
-func TestClientGetServerVersion(t *testing.T) {
-	htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerVersion",
-		DataDirs: make([]string, 2)}
-	ht, err := htraceBld.Build()
-	if err != nil {
-		t.Fatalf("failed to create datastore: %s", err.Error())
-	}
-	defer ht.Close()
-	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
-	if err != nil {
-		t.Fatalf("failed to create client: %s", err.Error())
-	}
-	defer hcl.Close()
-	_, err = hcl.GetServerVersion()
-	if err != nil {
-		t.Fatalf("failed to call GetServerVersion: %s", err.Error())
-	}
-}
-
-func TestClientGetServerDebugInfo(t *testing.T) {
-	htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerDebugInfo",
-		DataDirs: make([]string, 2)}
-	ht, err := htraceBld.Build()
-	if err != nil {
-		t.Fatalf("failed to create datastore: %s", err.Error())
-	}
-	defer ht.Close()
-	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
-	if err != nil {
-		t.Fatalf("failed to create client: %s", err.Error())
-	}
-	defer hcl.Close()
-	debugInfo, err := hcl.GetServerDebugInfo()
-	if err != nil {
-		t.Fatalf("failed to call GetServerDebugInfo: %s", err.Error())
-	}
-	if debugInfo.StackTraces == "" {
-		t.Fatalf(`debugInfo.StackTraces == ""`)
-	}
-	if debugInfo.GCStats == "" {
-		t.Fatalf(`debugInfo.GCStats == ""`)
-	}
-}
-
-func createRandomTestSpans(amount int) common.SpanSlice {
-	rnd := rand.New(rand.NewSource(2))
-	allSpans := make(common.SpanSlice, amount)
-	allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0])
-	for i := 1; i < amount; i++ {
-		allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i])
-	}
-	allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)}
-	return allSpans
-}
-
-func TestClientOperations(t *testing.T) {
-	htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
-		DataDirs:     make([]string, 2),
-		WrittenSpans: common.NewSemaphore(0),
-	}
-	ht, err := htraceBld.Build()
-	if err != nil {
-		t.Fatalf("failed to create datastore: %s", err.Error())
-	}
-	defer ht.Close()
-	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
-	if err != nil {
-		t.Fatalf("failed to create client: %s", err.Error())
-	}
-	defer hcl.Close()
-
-	// Create some random trace spans.
-	NUM_TEST_SPANS := 30
-	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-
-	// Write half of the spans to htraced via the client.
-	err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2])
-	if err != nil {
-		t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
-			err.Error())
-	}
-	ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS / 2))
-
-	// Look up the first half of the spans.  They should be found.
-	var span *common.Span
-	for i := 0; i < NUM_TEST_SPANS/2; i++ {
-		span, err = hcl.FindSpan(allSpans[i].Id)
-		if err != nil {
-			t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
-		}
-		common.ExpectSpansEqual(t, allSpans[i], span)
-	}
-
-	// Look up the second half of the spans.  They should not be found.
-	for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ {
-		span, err = hcl.FindSpan(allSpans[i].Id)
-		if err != nil {
-			t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
-		}
-		if span != nil {
-			t.Fatalf("Unexpectedly found a span we never write to "+
-				"the server: FindSpan(%d) succeeded\n", i)
-		}
-	}
-
-	// Test FindChildren
-	childSpan := allSpans[1]
-	parentId := childSpan.Parents[0]
-	var children []common.SpanId
-	children, err = hcl.FindChildren(parentId, 1)
-	if err != nil {
-		t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error())
-	}
-	if len(children) != 1 {
-		t.Fatalf("FindChildren(%s) returned an invalid number of "+
-			"children: expected %d, got %d\n", parentId, 1, len(children))
-	}
-	if !children[0].Equal(childSpan.Id) {
-		t.Fatalf("FindChildren(%s) returned an invalid child id: expected %s, "+
-			" got %s\n", parentId, childSpan.Id, children[0])
-	}
-
-	// Test FindChildren on a span that has no children
-	childlessSpan := allSpans[NUM_TEST_SPANS/2]
-	children, err = hcl.FindChildren(childlessSpan.Id, 10)
-	if err != nil {
-		t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, err.Error())
-	}
-	if len(children) != 0 {
-		t.Fatalf("FindChildren(%d) returned an invalid number of "+
-			"children: expected %d, got %d\n", childlessSpan.Id, 0, len(children))
-	}
-
-	// Test Query
-	var query common.Query
-	query = common.Query{Lim: 10}
-	spans, err := hcl.Query(&query)
-	if err != nil {
-		t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error())
-	}
-	if len(spans) != 10 {
-		t.Fatalf("Query({lim: %d}) returned an invalid number of "+
-			"children: expected %d, got %d\n", 10, 10, len(spans))
-	}
-}
-
-func TestDumpAll(t *testing.T) {
-	htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
-		DataDirs:     make([]string, 2),
-		WrittenSpans: common.NewSemaphore(0),
-		Cnf: map[string]string{
-			conf.HTRACE_LOG_LEVEL: "INFO",
-		},
-	}
-	ht, err := htraceBld.Build()
-	if err != nil {
-		t.Fatalf("failed to create datastore: %s", err.Error())
-	}
-	defer ht.Close()
-	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
-	if err != nil {
-		t.Fatalf("failed to create client: %s", err.Error())
-	}
-	defer hcl.Close()
-
-	NUM_TEST_SPANS := 100
-	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-	sort.Sort(allSpans)
-	err = hcl.WriteSpans(allSpans)
-	if err != nil {
-		t.Fatalf("WriteSpans failed: %s\n", err.Error())
-	}
-	ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
-	out := make(chan *common.Span, NUM_TEST_SPANS)
-	var dumpErr error
-	go func() {
-		dumpErr = hcl.DumpAll(3, out)
-	}()
-	var numSpans int
-	nextLogTime := time.Now().Add(time.Millisecond * 5)
-	for {
-		span, channelOpen := <-out
-		if !channelOpen {
-			break
-		}
-		common.ExpectSpansEqual(t, allSpans[numSpans], span)
-		numSpans++
-		if testing.Verbose() {
-			now := time.Now()
-			if !now.Before(nextLogTime) {
-				nextLogTime = now
-				nextLogTime = nextLogTime.Add(time.Millisecond * 5)
-				fmt.Printf("read back %d span(s)...\n", numSpans)
-			}
-		}
-	}
-	if numSpans != len(allSpans) {
-		t.Fatalf("expected to read %d spans... but only read %d\n",
-			len(allSpans), numSpans)
-	}
-	if dumpErr != nil {
-		t.Fatalf("got dump error %s\n", dumpErr.Error())
-	}
-}
-
-const EXAMPLE_CONF_KEY = "example.conf.key"
-const EXAMPLE_CONF_VALUE = "foo.bar.baz"
-
-func TestClientGetServerConf(t *testing.T) {
-	htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf",
-		Cnf: map[string]string{
-			EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE,
-		},
-		DataDirs: make([]string, 2)}
-	ht, err := htraceBld.Build()
-	if err != nil {
-		t.Fatalf("failed to create datastore: %s", err.Error())
-	}
-	defer ht.Close()
-	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
-	if err != nil {
-		t.Fatalf("failed to create client: %s", err.Error())
-	}
-	defer hcl.Close()
-	serverCnf, err2 := hcl.GetServerConf()
-	if err2 != nil {
-		t.Fatalf("failed to call GetServerConf: %s", err2.Error())
-	}
-	if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE {
-		t.Fatalf("unexpected value for %s: %s",
-			EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
-	}
-}
-
-const TEST_NUM_HRPC_HANDLERS = 2
-
-const TEST_NUM_WRITESPANS = 4
-
-// Tests that HRPC limits the number of simultaneous connections being processed.
-func TestHrpcAdmissionsControl(t *testing.T) {
-	var wg sync.WaitGroup
-	wg.Add(TEST_NUM_WRITESPANS)
-	var numConcurrentHrpcCalls int32
-	testHooks := &hrpcTestHooks{
-		HandleAdmission: func() {
-			defer wg.Done()
-			n := atomic.AddInt32(&numConcurrentHrpcCalls, 1)
-			if n > TEST_NUM_HRPC_HANDLERS {
-				t.Fatalf("The number of concurrent HRPC calls went above "+
-					"%d: it's at %d\n", TEST_NUM_HRPC_HANDLERS, n)
-			}
-			time.Sleep(1 * time.Millisecond)
-			n = atomic.AddInt32(&numConcurrentHrpcCalls, -1)
-			if n >= TEST_NUM_HRPC_HANDLERS {
-				t.Fatalf("The number of concurrent HRPC calls went above "+
-					"%d: it was at %d\n", TEST_NUM_HRPC_HANDLERS, n+1)
-			}
-		},
-	}
-	htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl",
-		DataDirs: make([]string, 2),
-		Cnf: map[string]string{
-			conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
-		},
-		WrittenSpans:  common.NewSemaphore(0),
-		HrpcTestHooks: testHooks,
-	}
-	ht, err := htraceBld.Build()
-	if err != nil {
-		t.Fatalf("failed to create datastore: %s", err.Error())
-	}
-	defer ht.Close()
-	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
-	if err != nil {
-		t.Fatalf("failed to create client: %s", err.Error())
-	}
-	// Create some random trace spans.
-	allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
-	for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
-		go func(i int) {
-			err = hcl.WriteSpans(allSpans[i : i+1])
-			if err != nil {
-				t.Fatalf("WriteSpans failed: %s\n", err.Error())
-			}
-		}(iter)
-	}
-	wg.Wait()
-	ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS))
-}
-
-// Tests that HRPC I/O timeouts work.
-func TestHrpcIoTimeout(t *testing.T) {
-	htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout",
-		DataDirs: make([]string, 2),
-		Cnf: map[string]string{
-			conf.HTRACE_NUM_HRPC_HANDLERS:  fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
-			conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1",
-		},
-	}
-	ht, err := htraceBld.Build()
-	if err != nil {
-		t.Fatalf("failed to create datastore: %s", err.Error())
-	}
-	defer ht.Close()
-	var hcl *htrace.Client
-	finishClient := make(chan interface{})
-	defer func() {
-		// Close the finishClient channel, if it hasn't already been closed.
-		defer func() { recover() }()
-		close(finishClient)
-	}()
-	testHooks := &htrace.TestHooks{
-		HandleWriteRequestBody: func() {
-			<-finishClient
-		},
-	}
-	hcl, err = htrace.NewClient(ht.ClientConf(), testHooks)
-	if err != nil {
-		t.Fatalf("failed to create client: %s", err.Error())
-	}
-	// Create some random trace spans.
-	allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
-	var wg sync.WaitGroup
-	wg.Add(TEST_NUM_WRITESPANS)
-	for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
-		go func(i int) {
-			defer wg.Done()
-			// Ignore the error return because there are internal retries in
-			// the client which will make this succeed eventually, usually.
-			// Keep in mind that we only block until we have seen
-			// TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that,
-			// we let requests through so that the test can exit cleanly.
-			hcl.WriteSpans(allSpans[i : i+1])
-		}(iter)
-	}
-	for {
-		if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS {
-			break
-		}
-		time.Sleep(1000 * time.Nanosecond)
-	}
-	close(finishClient)
-	wg.Wait()
-}
-
-func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) {
-	htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
-		Cnf: map[string]string{
-			conf.HTRACE_LOG_LEVEL: "INFO",
-			conf.HTRACE_NUM_HRPC_HANDLERS: "20",
-		},
-		WrittenSpans: common.NewSemaphore(int64(1 - N)),
-	}
-	ht, err := htraceBld.Build()
-	if err != nil {
-		panic(err)
-	}
-	defer ht.Close()
-	rnd := rand.New(rand.NewSource(1))
-	allSpans := make([]*common.Span, N)
-	for n := 0; n < N; n++ {
-		allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
-	}
-	// Determine how many calls to WriteSpans we should make.  Each writeSpans
-	// message should be small enough so that it doesn't exceed the max RPC
-	// body length limit.  TODO: a production-quality golang client would do
-	// this internally rather than needing us to do it here in the unit test.
-	bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
-	reqs := make([][]*common.Span, 0, 4)
-	curReq := -1
-	curReqLen := bodyLen
-	var curReqSpans uint32
-	mh := new(codec.MsgpackHandle)
-	mh.WriteExt = true
-	var mbuf [8192]byte
-	buf := mbuf[:0]
-	enc := codec.NewEncoderBytes(&buf, mh)
-	for n := 0; n < N; n++ {
-		span := allSpans[n]
-		if (curReqSpans >= maxSpansPerRpc) ||
-			(curReqLen >= bodyLen) {
-			reqs = append(reqs, make([]*common.Span, 0, 16))
-			curReqLen = 0
-			curReq++
-			curReqSpans = 0
-		}
-		buf = mbuf[:0]
-		enc.ResetBytes(&buf)
-		err := enc.Encode(span)
-		if err != nil {
-			panic(fmt.Sprintf("Error encoding span %s: %s\n",
-				span.String(), err.Error()))
-		}
-		bufLen := len(buf)
-		if bufLen > (bodyLen / 5) {
-			panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen))
-		}
-		curReqLen += bufLen
-		reqs[curReq] = append(reqs[curReq], span)
-		curReqSpans++
-	}
-	ht.Store.lg.Infof("num spans: %d.  num WriteSpansReq calls: %d\n", N, len(reqs))
-	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
-	if err != nil {
-		panic(fmt.Sprintf("failed to create client: %s", err.Error()))
-	}
-	defer hcl.Close()
-
-	// Reset the timer to avoid including the time required to create new
-	// random spans in the benchmark total.
-	if b != nil {
-		b.ResetTimer()
-	}
-
-	// Write many random spans.
-	for reqIdx := range reqs {
-		go func(i int) {
-			err = hcl.WriteSpans(reqs[i])
-			if err != nil {
-				panic(fmt.Sprintf("failed to send WriteSpans request %d: %s",
-					i, err.Error()))
-			}
-		}(reqIdx)
-	}
-	// Wait for all the spans to be written.
-	ht.Store.WrittenSpans.Wait()
-}
-
-// This is a test of how quickly we can create new spans via WriteSpans RPCs.
-// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore.
-// Unlike that benchmark, it sends the spans via RPC.
-// Suggested flags for running this:
-// -tags unsafe -cpu 16 -benchtime=1m
-func BenchmarkWriteSpans(b *testing.B) {
-	doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b)
-}
-
-func TestWriteSpansRpcs(t *testing.T) {
-	doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil)
-}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
deleted file mode 100644
index 82fb7b5..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ /dev/null
@@ -1,1340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
-	"bytes"
-	"encoding/hex"
-	"errors"
-	"fmt"
-	"github.com/jmhodges/levigo"
-	"github.com/ugorji/go/codec"
-	"org/apache/htrace/common"
-	"org/apache/htrace/conf"
-	"strconv"
-	"strings"
-	"sync"
-	"sync/atomic"
-	"time"
-)
-
-//
-// The data store code for HTraced.
-//
-// This code stores the trace spans.  We use levelDB here so that we don't have to store everything
-// in memory at all times.  The data is sharded across multiple levelDB databases in multiple
-// directories.  Normally, these multiple directories will be on multiple disk drives.
-//
-// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data
-// coming from many daemons.  Durability is not as big a concern as in some data stores, since
-// losing a little bit of trace data if htraced goes down is not critical.  We use msgpack
-// for serialization.  We assume that there will be many more writes than reads.
-//
-// Schema
-// w -> ShardInfo
-// s[8-byte-big-endian-sid] -> SpanData
-// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
-// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
-// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
-// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
-//
-// Note that span IDs are unsigned 64-bit numbers.
-// Begin times, end times, and durations are signed 64-bit numbers.
-// In order to get LevelDB to properly compare the signed 64-bit quantities,
-// we flip the highest bit.  This way, we can get leveldb to view negative
-// quantities as less than non-negative ones.  This also means that we can do
-// all queries using unsigned 64-bit math, rather than having to special-case
-// the signed fields.
-//
-
-var EMPTY_BYTE_BUF []byte = []byte{}
-
-const SPAN_ID_INDEX_PREFIX = 's'
-const BEGIN_TIME_INDEX_PREFIX = 'b'
-const END_TIME_INDEX_PREFIX = 'e'
-const DURATION_INDEX_PREFIX = 'd'
-const PARENT_ID_INDEX_PREFIX = 'p'
-const INVALID_INDEX_PREFIX = 0
-
-// The maximum span expiry time, in milliseconds.
-// For all practical purposes this is "never" since it's more than a million years.
-const MAX_SPAN_EXPIRY_MS = 0x7ffffffffffffff
-
-type IncomingSpan struct {
-	// The address that the span was sent from.
-	Addr string
-
-	// The span.
-	*common.Span
-
-	// Serialized span data
-	SpanDataBytes []byte
-}
-
-// A single directory containing a levelDB instance.
-type shard struct {
-	// The data store that this shard is part of
-	store *dataStore
-
-	// The LevelDB instance.
-	ldb *levigo.DB
-
-	// The path to the leveldb directory this shard is managing.
-	path string
-
-	// Incoming requests to write Spans.
-	incoming chan []*IncomingSpan
-
-	// A channel for incoming heartbeats
-	heartbeats chan interface{}
-
-	// Tracks whether the shard goroutine has exited.
-	exited sync.WaitGroup
-}
-
-// Process incoming spans for a shard.
-func (shd *shard) processIncoming() {
-	lg := shd.store.lg
-	defer func() {
-		lg.Infof("Shard processor for %s exiting.\n", shd.path)
-		shd.exited.Done()
-	}()
-	for {
-		select {
-		case spans := <-shd.incoming:
-			if spans == nil {
-				return
-			}
-			totalWritten := 0
-			totalDropped := 0
-			for spanIdx := range spans {
-				err := shd.writeSpan(spans[spanIdx])
-				if err != nil {
-					lg.Errorf("Shard processor for %s got fatal error %s.\n",
-						shd.path, err.Error())
-					totalDropped++
-				} else {
-					if lg.TraceEnabled() {
-						lg.Tracef("Shard processor for %s wrote span %s.\n",
-							shd.path, spans[spanIdx].ToJson())
-					}
-					totalWritten++
-				}
-			}
-			shd.store.msink.UpdatePersisted(spans[0].Addr, totalWritten, totalDropped)
-			if shd.store.WrittenSpans != nil {
-				lg.Debugf("Shard %s incrementing WrittenSpans by %d\n", shd.path, len(spans))
-				shd.store.WrittenSpans.Posts(int64(len(spans)))
-			}
-		case <-shd.heartbeats:
-			lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path)
-			shd.pruneExpired()
-		}
-	}
-}
-
-func (shd *shard) pruneExpired() {
-	lg := shd.store.rpr.lg
-	src, err := CreateReaperSource(shd)
-	if err != nil {
-		lg.Errorf("Error creating reaper source for shd(%s): %s\n",
-			shd.path, err.Error())
-		return
-	}
-	var totalReaped uint64
-	defer func() {
-		src.Close()
-		if totalReaped > 0 {
-			atomic.AddUint64(&shd.store.rpr.ReapedSpans, totalReaped)
-		}
-	}()
-	urdate := s2u64(shd.store.rpr.GetReaperDate())
-	for {
-		span := src.next()
-		if span == nil {
-			lg.Debugf("After reaping %d span(s), no more found in shard %s "+
-				"to reap.\n", totalReaped, shd.path)
-			return
-		}
-		begin := s2u64(span.Begin)
-		if begin >= urdate {
-			lg.Debugf("After reaping %d span(s), the remaining spans in "+
-				"shard %s are new enough to be kept\n",
-				totalReaped, shd.path)
-			return
-		}
-		err = shd.DeleteSpan(span)
-		if err != nil {
-			lg.Errorf("Error deleting span %s from shd(%s): %s\n",
-				span.String(), shd.path, err.Error())
-			return
-		}
-		if lg.TraceEnabled() {
-			lg.Tracef("Reaped span %s from shard %s\n", span.String(), shd.path)
-		}
-		totalReaped++
-	}
-}
-
-// Delete a span from the shard.  Note that leveldb may retain the data until
-// compaction(s) remove it.
-func (shd *shard) DeleteSpan(span *common.Span) error {
-	batch := levigo.NewWriteBatch()
-	defer batch.Close()
-	primaryKey :=
-		append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
-	batch.Delete(primaryKey)
-	for parentIdx := range span.Parents {
-		key := append(append([]byte{PARENT_ID_INDEX_PREFIX},
-			span.Parents[parentIdx].Val()...), span.Id.Val()...)
-		batch.Delete(key)
-	}
-	beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX},
-		u64toSlice(s2u64(span.Begin))...), span.Id.Val()...)
-	batch.Delete(beginTimeKey)
-	endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX},
-		u64toSlice(s2u64(span.End))...), span.Id.Val()...)
-	batch.Delete(endTimeKey)
-	durationKey := append(append([]byte{DURATION_INDEX_PREFIX},
-		u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
-	batch.Delete(durationKey)
-	err := shd.ldb.Write(shd.store.writeOpts, batch)
-	if err != nil {
-		return err
-	}
-	return nil
-}
-
-// Convert a signed 64-bit number into an unsigned 64-bit number.  We flip the
-// highest bit, so that negative input values map to unsigned numbers which are
-// less than non-negative input values.
-func s2u64(val int64) uint64 {
-	ret := uint64(val)
-	ret ^= 0x8000000000000000
-	return ret
-}
-
-func u64toSlice(val uint64) []byte {
-	return []byte{
-		byte(0xff & (val >> 56)),
-		byte(0xff & (val >> 48)),
-		byte(0xff & (val >> 40)),
-		byte(0xff & (val >> 32)),
-		byte(0xff & (val >> 24)),
-		byte(0xff & (val >> 16)),
-		byte(0xff & (val >> 8)),
-		byte(0xff & (val >> 0))}
-}
-
-func (shd *shard) writeSpan(ispan *IncomingSpan) error {
-	batch := levigo.NewWriteBatch()
-	defer batch.Close()
-	span := ispan.Span
-	primaryKey :=
-		append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
-	batch.Put(primaryKey, ispan.SpanDataBytes)
-
-	// Add this to the parent index.
-	for parentIdx := range span.Parents {
-		key := append(append([]byte{PARENT_ID_INDEX_PREFIX},
-			span.Parents[parentIdx].Val()...), span.Id.Val()...)
-		batch.Put(key, EMPTY_BYTE_BUF)
-	}
-
-	// Add to the other secondary indices.
-	beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX},
-		u64toSlice(s2u64(span.Begin))...), span.Id.Val()...)
-	batch.Put(beginTimeKey, EMPTY_BYTE_BUF)
-	endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX},
-		u64toSlice(s2u64(span.End))...), span.Id.Val()...)
-	batch.Put(endTimeKey, EMPTY_BYTE_BUF)
-	durationKey := append(append([]byte{DURATION_INDEX_PREFIX},
-		u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
-	batch.Put(durationKey, EMPTY_BYTE_BUF)
-
-	err := shd.ldb.Write(shd.store.writeOpts, batch)
-	if err != nil {
-		shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n",
-			span.String(), shd.path, err.Error())
-		return err
-	}
-	return nil
-}
-
-func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId,
-	lim int32) ([]common.SpanId, int32, error) {
-	searchKey := append([]byte{PARENT_ID_INDEX_PREFIX}, sid.Val()...)
-	iter := shd.ldb.NewIterator(shd.store.readOpts)
-	defer iter.Close()
-	iter.Seek(searchKey)
-	for {
-		if !iter.Valid() {
-			break
-		}
-		if lim == 0 {
-			break
-		}
-		key := iter.Key()
-		if !bytes.HasPrefix(key, searchKey) {
-			break
-		}
-		id := common.SpanId(key[17:])
-		childIds = append(childIds, id)
-		lim--
-		iter.Next()
-	}
-	return childIds, lim, nil
-}
-
-// Close a shard.
-func (shd *shard) Close() {
-	lg := shd.store.lg
-	shd.incoming <- nil
-	lg.Infof("Waiting for %s to exit...\n", shd.path)
-	shd.exited.Wait()
-	shd.ldb.Close()
-	lg.Infof("Closed %s...\n", shd.path)
-}
-
-type Reaper struct {
-	// The logger used by the reaper
-	lg *common.Logger
-
-	// The number of milliseconds to keep spans around, in milliseconds.
-	spanExpiryMs int64
-
-	// The oldest date for which we'll keep spans.
-	reaperDate int64
-
-	// A channel used to send heartbeats to the reaper
-	heartbeats chan interface{}
-
-	// Tracks whether the reaper goroutine has exited
-	exited sync.WaitGroup
-
-	// The lock protecting reaper data.
-	lock sync.Mutex
-
-	// The reaper heartbeater
-	hb *Heartbeater
-
-	// The total number of spans which have been reaped.
-	ReapedSpans uint64
-}
-
-func NewReaper(cnf *conf.Config) *Reaper {
-	rpr := &Reaper{
-		lg:           common.NewLogger("reaper", cnf),
-		spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS),
-		heartbeats:   make(chan interface{}, 1),
-	}
-	if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS {
-		rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS
-	} else if rpr.spanExpiryMs <= 0 {
-		rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS
-	}
-	rpr.hb = NewHeartbeater("ReaperHeartbeater",
-		cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg)
-	rpr.exited.Add(1)
-	go rpr.run()
-	rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{
-		name:       "reaper",
-		targetChan: rpr.heartbeats,
-	})
-	var when string
-	if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS {
-		when = "never"
-	} else {
-		when = "after " + time.Duration(rpr.spanExpiryMs).String()
-	}
-	rpr.lg.Infof("Initializing span reaper: span time out = %s.\n", when)
-	return rpr
-}
-
-func (rpr *Reaper) run() {
-	defer func() {
-		rpr.lg.Info("Exiting Reaper goroutine.\n")
-		rpr.exited.Done()
-	}()
-
-	for {
-		_, isOpen := <-rpr.heartbeats
-		if !isOpen {
-			return
-		}
-		rpr.handleHeartbeat()
-	}
-}
-
-func (rpr *Reaper) handleHeartbeat() {
-	// TODO: check dataStore fullness
-	now := common.TimeToUnixMs(time.Now().UTC())
-	d, updated := func() (int64, bool) {
-		rpr.lock.Lock()
-		defer rpr.lock.Unlock()
-		newReaperDate := now - rpr.spanExpiryMs
-		if newReaperDate > rpr.reaperDate {
-			rpr.reaperDate = newReaperDate
-			return rpr.reaperDate, true
-		} else {
-			return rpr.reaperDate, false
-		}
-	}()
-	if rpr.lg.DebugEnabled() {
-		if updated {
-			rpr.lg.Debugf("Updating UTC reaper date to %s.\n",
-				common.UnixMsToTime(d).Format(time.RFC3339))
-		} else {
-			rpr.lg.Debugf("Not updating previous reaperDate of %s.\n",
-				common.UnixMsToTime(d).Format(time.RFC3339))
-		}
-	}
-}
-
-func (rpr *Reaper) GetReaperDate() int64 {
-	rpr.lock.Lock()
-	defer rpr.lock.Unlock()
-	return rpr.reaperDate
-}
-
-func (rpr *Reaper) SetReaperDate(rdate int64) {
-	rpr.lock.Lock()
-	defer rpr.lock.Unlock()
-	rpr.reaperDate = rdate
-}
-
-func (rpr *Reaper) Shutdown() {
-	rpr.hb.Shutdown()
-	close(rpr.heartbeats)
-}
-
-// The Data Store.
-type dataStore struct {
-	lg *common.Logger
-
-	// The shards which manage our LevelDB instances.
-	shards []*shard
-
-	// The read options to use for LevelDB.
-	readOpts *levigo.ReadOptions
-
-	// The write options to use for LevelDB.
-	writeOpts *levigo.WriteOptions
-
-	// If non-null, a semaphore we will increment once for each span we receive.
-	// Used for testing.
-	WrittenSpans *common.Semaphore
-
-	// The metrics sink.
-	msink *MetricsSink
-
-	// The heartbeater which periodically asks shards to update the MetricsSink.
-	hb *Heartbeater
-
-	// The reaper for this datastore
-	rpr *Reaper
-
-	// When this datastore was started (in UTC milliseconds since the epoch)
-	startMs int64
-}
-
-func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataStore, error) {
-	dld := NewDataStoreLoader(cnf)
-	defer dld.Close()
-	err := dld.Load()
-	if err != nil {
-		dld.lg.Errorf("Error loading datastore: %s\n", err.Error())
-		return nil, err
-	}
-	store := &dataStore {
-		lg: dld.lg,
-		shards: make([]*shard, len(dld.shards)),
-		readOpts: dld.readOpts,
-		writeOpts: dld.writeOpts,
-		WrittenSpans: writtenSpans,
-		msink: NewMetricsSink(cnf),
-		hb: NewHeartbeater("DatastoreHeartbeater",
-			cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), dld.lg),
-		rpr: NewReaper(cnf),
-		startMs: common.TimeToUnixMs(time.Now().UTC()),
-	}
-	spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
-	for shdIdx := range store.shards {
-		shd := &shard {
-			store: store,
-			ldb: dld.shards[shdIdx].ldb,
-			path: dld.shards[shdIdx].path,
-			incoming: make(chan []*IncomingSpan, spanBufferSize),
-			heartbeats: make(chan interface{}, 1),
-		}
-		shd.exited.Add(1)
-		go shd.processIncoming()
-		store.shards[shdIdx] = shd
-		store.hb.AddHeartbeatTarget(&HeartbeatTarget{
-			name:       fmt.Sprintf("shard(%s)", shd.path),
-			targetChan: shd.heartbeats,
-		})
-	}
-	dld.DisownResources()
-	return store, nil
-}
-
-// Close the DataStore.
-func (store *dataStore) Close() {
-	if store.hb != nil {
-		store.hb.Shutdown()
-		store.hb = nil
-	}
-	for idx := range store.shards {
-		if store.shards[idx] != nil {
-			store.shards[idx].Close()
-			store.shards[idx] = nil
-		}
-	}
-	if store.rpr != nil {
-		store.rpr.Shutdown()
-		store.rpr = nil
-	}
-	if store.readOpts != nil {
-		store.readOpts.Close()
-		store.readOpts = nil
-	}
-	if store.writeOpts != nil {
-		store.writeOpts.Close()
-		store.writeOpts = nil
-	}
-	if store.lg != nil {
-		store.lg.Close()
-		store.lg = nil
-	}
-}
-
-// Get the index of the shard which stores the given spanId.
-func (store *dataStore) getShardIndex(sid common.SpanId) int {
-	return int(sid.Hash32() % uint32(len(store.shards)))
-}
-
-const WRITESPANS_BATCH_SIZE = 128
-
-// SpanIngestor is a class used internally to ingest spans from an RPC
-// endpoint.  It groups spans destined for a particular shard into small
-// batches, so that we can reduce the number of objects that need to be sent
-// over the shard's "incoming" channel.  Since sending objects over a channel
-// requires goroutine synchronization, this improves performance.
-//
-// SpanIngestor also allows us to reuse the same encoder object for many spans,
-// rather than creating a new encoder per span.  This avoids re-doing the
-// encoder setup for each span, and also generates less garbage.
-type SpanIngestor struct {
-	// The logger to use.
-	lg *common.Logger
-
-	// The dataStore we are ingesting spans into.
-	store *dataStore
-
-	// The remote address these spans are coming from.
-	addr string
-
-	// Default TracerId
-	defaultTrid string
-
-	// The msgpack handle to use to serialize the spans.
-	mh codec.MsgpackHandle
-
-	// The msgpack encoder to use to serialize the spans.
-	// Caching this avoids generating a lot of garbage and burning CPUs
-	// creating new encoder objects for each span.
-	enc *codec.Encoder
-
-	// The buffer which codec.Encoder is currently serializing to.
-	// We have to create a new buffer for each span because once we hand it off to the shard, the
-	// shard manages the buffer lifecycle.
-	spanDataBytes []byte
-
-	// An array mapping shard index to span batch.
-	batches []*SpanIngestorBatch
-
-	// The total number of spans ingested.  Includes dropped spans.
-	totalIngested int
-
-	// The total number of spans the ingestor dropped because of a server-side error.
-	serverDropped int
-}
-
-// A batch of spans destined for a particular shard.
-type SpanIngestorBatch struct {
-	incoming []*IncomingSpan
-}
-
-func (store *dataStore) NewSpanIngestor(lg *common.Logger,
-	addr string, defaultTrid string) *SpanIngestor {
-	ing := &SpanIngestor{
-		lg:            lg,
-		store:         store,
-		addr:          addr,
-		defaultTrid:   defaultTrid,
-		spanDataBytes: make([]byte, 0, 1024),
-		batches:       make([]*SpanIngestorBatch, len(store.shards)),
-	}
-	ing.mh.WriteExt = true
-	ing.enc = codec.NewEncoderBytes(&ing.spanDataBytes, &ing.mh)
-	for batchIdx := range ing.batches {
-		ing.batches[batchIdx] = &SpanIngestorBatch{
-			incoming: make([]*IncomingSpan, 0, WRITESPANS_BATCH_SIZE),
-		}
-	}
-	return ing
-}
-
-func (ing *SpanIngestor) IngestSpan(span *common.Span) {
-	ing.totalIngested++
-	// Make sure the span ID is valid.
-	spanIdProblem := span.Id.FindProblem()
-	if spanIdProblem != "" {
-		// Can't print the invalid span ID because String() might fail.
-		ing.lg.Warnf("Invalid span ID: %s\n", spanIdProblem)
-		ing.serverDropped++
-		return
-	}
-
-	// Set the default tracer id, if needed.
-	if span.TracerId == "" {
-		span.TracerId = ing.defaultTrid
-	}
-
-	// Encode the span data.  Doing the encoding here is better than doing it
-	// in the shard goroutine, because we can achieve more parallelism.
-	// There is one shard goroutine per shard, but potentially many more
-	// ingestors per shard.
-	err := ing.enc.Encode(span.SpanData)
-	if err != nil {
-		ing.lg.Warnf("Failed to encode span ID %s: %s\n",
-			span.Id.String(), err.Error())
-		ing.serverDropped++
-		return
-	}
-	spanDataBytes := ing.spanDataBytes
-	ing.spanDataBytes = make([]byte, 0, 1024)
-	ing.enc.ResetBytes(&ing.spanDataBytes)
-
-	// Determine which shard this span should go to.
-	shardIdx := ing.store.getShardIndex(span.Id)
-	batch := ing.batches[shardIdx]
-	incomingLen := len(batch.incoming)
-	if ing.lg.TraceEnabled() {
-		ing.lg.Tracef("SpanIngestor#IngestSpan: spanId=%s, shardIdx=%d, "+
-			"incomingLen=%d, cap(batch.incoming)=%d\n",
-			span.Id.String(), shardIdx, incomingLen, cap(batch.incoming))
-	}
-	if incomingLen+1 == cap(batch.incoming) {
-		if ing.lg.TraceEnabled() {
-			ing.lg.Tracef("SpanIngestor#IngestSpan: flushing %d spans for "+
-				"shard %d\n", len(batch.incoming), shardIdx)
-		}
-		ing.store.WriteSpans(shardIdx, batch.incoming)
-		batch.incoming = make([]*IncomingSpan, 1, WRITESPANS_BATCH_SIZE)
-		incomingLen = 0
-	} else {
-		batch.incoming = batch.incoming[0 : incomingLen+1]
-	}
-	batch.incoming[incomingLen] = &IncomingSpan{
-		Addr:          ing.addr,
-		Span:          span,
-		SpanDataBytes: spanDataBytes,
-	}
-}
-
-func (ing *SpanIngestor) Close(startTime time.Time) {
-	for shardIdx := range ing.batches {
-		batch := ing.batches[shardIdx]
-		if len(batch.incoming) > 0 {
-			if ing.lg.TraceEnabled() {
-				ing.lg.Tracef("SpanIngestor#Close: flushing %d span(s) for "+
-					"shard %d\n", len(batch.incoming), shardIdx)
-			}
-			ing.store.WriteSpans(shardIdx, batch.incoming)
-		}
-		batch.incoming = nil
-	}
-	ing.lg.Debugf("Closed span ingestor for %s.  Ingested %d span(s); dropped "+
-		"%d span(s).\n", ing.addr, ing.totalIngested, ing.serverDropped)
-
-	endTime := time.Now()
-	ing.store.msink.UpdateIngested(ing.addr, ing.totalIngested,
-		ing.serverDropped, endTime.Sub(startTime))
-}
-
-func (store *dataStore) WriteSpans(shardIdx int, ispans []*IncomingSpan) {
-	store.shards[shardIdx].incoming <- ispans
-}
-
-func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
-	return store.shards[store.getShardIndex(sid)].FindSpan(sid)
-}
-
-func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
-	lg := shd.store.lg
-	primaryKey := append([]byte{SPAN_ID_INDEX_PREFIX}, sid.Val()...)
-	buf, err := shd.ldb.Get(shd.store.readOpts, primaryKey)
-	if err != nil {
-		if strings.Index(err.Error(), "NotFound:") != -1 {
-			return nil
-		}
-		lg.Warnf("Shard(%s): FindSpan(%s) error: %s\n",
-			shd.path, sid.String(), err.Error())
-		return nil
-	}
-	var span *common.Span
-	span, err = shd.decodeSpan(sid, buf)
-	if err != nil {
-		lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s decoding [%s]\n",
-			shd.path, sid.String(), err.Error(), hex.EncodeToString(buf))
-		return nil
-	}
-	return span
-}
-
-func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) {
-	r := bytes.NewBuffer(buf)
-	mh := new(codec.MsgpackHandle)
-	mh.WriteExt = true
-	decoder := codec.NewDecoder(r, mh)
-	data := common.SpanData{}
-	err := decoder.Decode(&data)
-	if err != nil {
-		return nil, err
-	}
-	if data.Parents == nil {
-		data.Parents = []common.SpanId{}
-	}
-	return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
-}
-
-// Find the children of a given span id.
-func (store *dataStore) FindChildren(sid common.SpanId, lim int32) []common.SpanId {
-	childIds := make([]common.SpanId, 0)
-	var err error
-
-	startIdx := store.getShardIndex(sid)
-	idx := startIdx
-	numShards := len(store.shards)
-	for {
-		if lim == 0 {
-			break
-		}
-		shd := store.shards[idx]
-		childIds, lim, err = shd.FindChildren(sid, childIds, lim)
-		if err != nil {
-			store.lg.Errorf("Shard(%s): FindChildren(%s) error: %s\n",
-				shd.path, sid.String(), err.Error())
-		}
-		idx++
-		if idx >= numShards {
-			idx = 0
-		}
-		if idx == startIdx {
-			break
-		}
-	}
-	return childIds
-}
-
-type predicateData struct {
-	*common.Predicate
-	key []byte
-}
-
-func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
-	p := predicateData{Predicate: pred}
-
-	// Parse the input value given to make sure it matches up with the field
-	// type.
-	switch pred.Field {
-	case common.SPAN_ID:
-		// Span IDs are sent as hex strings.
-		var id common.SpanId
-		if err := id.FromString(pred.Val); err != nil {
-			return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s",
-				pred.Val, err.Error()))
-		}
-		p.key = id.Val()
-		break
-	case common.DESCRIPTION:
-		// Any string is valid for a description.
-		p.key = []byte(pred.Val)
-		break
-	case common.BEGIN_TIME, common.END_TIME, common.DURATION:
-		// Parse a base-10 signed numeric field.
-		v, err := strconv.ParseInt(pred.Val, 10, 64)
-		if err != nil {
-			return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s",
-				pred.Field, pred.Val, err.Error()))
-		}
-		p.key = u64toSlice(s2u64(v))
-		break
-	case common.TRACER_ID:
-		// Any string is valid for a tracer ID.
-		p.key = []byte(pred.Val)
-		break
-	default:
-		return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field))
-	}
-
-	// Validate the predicate operation.
-	switch pred.Op {
-	case common.EQUALS, common.LESS_THAN_OR_EQUALS,
-		common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN:
-		break
-	case common.CONTAINS:
-		if p.fieldIsNumeric() {
-			return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+
-				"numeric field like '%s'", pred.Field))
-		}
-	default:
-		return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'",
-			pred.Op))
-	}
-
-	return &p, nil
-}
-
-// Get the index prefix for this predicate, or 0 if it is not indexed.
-func (pred *predicateData) getIndexPrefix() byte {
-	switch pred.Field {
-	case common.SPAN_ID:
-		return SPAN_ID_INDEX_PREFIX
-	case common.BEGIN_TIME:
-		return BEGIN_TIME_INDEX_PREFIX
-	case common.END_TIME:
-		return END_TIME_INDEX_PREFIX
-	case common.DURATION:
-		return DURATION_INDEX_PREFIX
-	default:
-		return INVALID_INDEX_PREFIX
-	}
-}
-
-// Returns true if the predicate type is numeric.
-func (pred *predicateData) fieldIsNumeric() bool {
-	switch pred.Field {
-	case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION:
-		return true
-	default:
-		return false
-	}
-}
-
-// Get the values that this predicate cares about for a given span.
-func (pred *predicateData) extractRelevantSpanData(span *common.Span) []byte {
-	switch pred.Field {
-	case common.SPAN_ID:
-		return span.Id.Val()
-	case common.DESCRIPTION:
-		return []byte(span.Description)
-	case common.BEGIN_TIME:
-		return u64toSlice(s2u64(span.Begin))
-	case common.END_TIME:
-		return u64toSlice(s2u64(span.End))
-	case common.DURATION:
-		return u64toSlice(s2u64(span.Duration()))
-	case common.TRACER_ID:
-		return []byte(span.TracerId)
-	default:
-		panic(fmt.Sprintf("Unknown field type %s.", pred.Field))
-	}
-}
-
-func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool {
-	// nil is after everything.
-	if a == nil {
-		if b == nil {
-			return false
-		}
-		return false
-	} else if b == nil {
-		return true
-	}
-	// Compare the spans according to this predicate.
-	aVal := pred.extractRelevantSpanData(a)
-	bVal := pred.extractRelevantSpanData(b)
-	cmp := bytes.Compare(aVal, bVal)
-	if pred.Op.IsDescending() {
-		return cmp > 0
-	} else {
-		return cmp < 0
-	}
-}
-
-type satisfiedByReturn int
-
-const (
-	NOT_SATISFIED satisfiedByReturn = iota
-	NOT_YET_SATISFIED = iota
-	SATISFIED = iota
-)
-
-func (r satisfiedByReturn) String() string {
-	switch (r) {
-	case NOT_SATISFIED:
-		return "NOT_SATISFIED"
-	case NOT_YET_SATISFIED:
-		return "NOT_YET_SATISFIED"
-	case SATISFIED:
-		return "SATISFIED"
-	default:
-		return "(unknown)"
-	}
-}
-
-// Determine whether the predicate is satisfied by the given span.
-func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn {
-	val := pred.extractRelevantSpanData(span)
-	switch pred.Op {
-	case common.CONTAINS:
-		if bytes.Contains(val, pred.key) {
-			return SATISFIED
-		} else {
-			return NOT_SATISFIED
-		}
-	case common.EQUALS:
-		if bytes.Equal(val, pred.key) {
-			return SATISFIED
-		} else {
-			return NOT_SATISFIED
-		}
-	case common.LESS_THAN_OR_EQUALS:
-		if bytes.Compare(val, pred.key) <= 0 {
-			return SATISFIED
-		} else {
-			return NOT_YET_SATISFIED
-		}
-	case common.GREATER_THAN_OR_EQUALS:
-		if bytes.Compare(val, pred.key) >= 0 {
-			return SATISFIED
-		} else {
-			return NOT_SATISFIED
-		}
-	case common.GREATER_THAN:
-		cmp := bytes.Compare(val, pred.key)
-		if cmp <= 0 {
-			return NOT_YET_SATISFIED
-		} else {
-			return SATISFIED
-		}
-	default:
-		panic(fmt.Sprintf("unknown Op type %s should have been caught "+
-			"during normalization", pred.Op))
-	}
-}
-
-func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*source, error) {
-	var ret *source
-	src := source{store: store,
-		pred:      pred,
-		shards:    make([]*shard, len(store.shards)),
-		iters:     make([]*levigo.Iterator, 0, len(store.shards)),
-		nexts:     make([]*common.Span, len(store.shards)),
-		numRead:   make([]int, len(store.shards)),
-		keyPrefix: pred.getIndexPrefix(),
-	}
-	if src.keyPrefix == INVALID_INDEX_PREFIX {
-		return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+
-			"predicate on field %s", pred.Field))
-	}
-	defer func() {
-		if ret == nil {
-			src.Close()
-		}
-	}()
-	for shardIdx := range store.shards {
-		shd := store.shards[shardIdx]
-		src.shards[shardIdx] = shd
-		src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
-	}
-	var searchKey []byte
-	lg := store.lg
-	if prev != nil {
-		// If prev != nil, this query RPC is the continuation of a previous
-		// one.  The final result returned the last time is 'prev'.
-		//
-		// To avoid returning the same results multiple times, we adjust the
-		// predicate here.  If the predicate is on the span id field, we
-		// simply manipulate the span ID we're looking for.
-		//
-		// If the predicate is on a secondary index, we also use span ID, but
-		// in a slightly different way.  Since the secondary indices are
-		// organized as [type-code][8b-secondary-key][8b-span-id], elements
-		// with the same secondary index field are ordered by span ID.  So we
-		// create a 17-byte key incorporating the span ID from 'prev.'
-		startId := common.INVALID_SPAN_ID
-		switch pred.Op {
-		case common.EQUALS:
-			if pred.Field == common.SPAN_ID {
-				// This is an annoying corner case.  There can only be one
-				// result each time we do an EQUALS search for a span id.
-				// Span id is the primary key for all our spans.
-				// But for some reason someone is asking for another result.
-				// We modify the query to search for the illegal 0 span ID,
-				// which will never be present.
-				if lg.DebugEnabled() {
-					lg.Debugf("Attempted to use a continuation token with an EQUALS "+
-						"SPAN_ID query. %s.  Setting search id = 0",
-						pred.Predicate.String())
-				}
-				startId = common.INVALID_SPAN_ID
-			} else {
-				// When doing an EQUALS search on a secondary index, the
-				// results are sorted by span id.
-				startId = prev.Id.Next()
-			}
-		case common.LESS_THAN_OR_EQUALS:
-			// Subtract one from the previous span id.  Since the previous
-			// start ID will never be 0 (0 is an illegal span id), we'll never
-			// wrap around when doing this.
-			startId = prev.Id.Prev()
-		case common.GREATER_THAN_OR_EQUALS:
-			// We can't add one to the span id, since the previous span ID
-			// might be the maximum value.  So just switch over to using
-			// GREATER_THAN.
-			pred.Op = common.GREATER_THAN
-			startId = prev.Id
-		case common.GREATER_THAN:
-			// This one is easy.
-			startId = prev.Id
-		default:
-			str := fmt.Sprintf("Can't use a %v predicate as a source.", pred.Predicate.String())
-			lg.Error(str + "\n")
-			panic(str)
-		}
-		if pred.Field == common.SPAN_ID {
-			pred.key = startId.Val()
-			searchKey = append([]byte{src.keyPrefix}, startId.Val()...)
-		} else {
-			// Start where the previous query left off.  This means adjusting
-			// our uintKey.
-			pred.key = pred.extractRelevantSpanData(prev)
-			searchKey = append(append([]byte{src.keyPrefix}, pred.key...),
-				startId.Val()...)
-		}
-		if lg.TraceEnabled() {
-			lg.Tracef("Handling continuation token %s for %s.  startId=%d, "+
-				"pred.uintKey=%s\n", prev, pred.Predicate.String(), startId,
-				hex.EncodeToString(pred.key))
-		}
-	} else {
-		searchKey = append([]byte{src.keyPrefix}, pred.key...)
-	}
-	for i := range src.iters {
-		src.iters[i].Seek(searchKey)
-	}
-	ret = &src
-	return ret, nil
-}
-
-// A source of spans.
-type source struct {
-	store     *dataStore
-	pred      *predicateData
-	shards    []*shard
-	iters     []*levigo.Iterator
-	nexts     []*common.Span
-	numRead   []int
-	keyPrefix byte
-}
-
-func CreateReaperSource(shd *shard) (*source, error) {
-	store := shd.store
-	p := &common.Predicate{
-		Op:    common.GREATER_THAN_OR_EQUALS,
-		Field: common.BEGIN_TIME,
-		Val:   common.INVALID_SPAN_ID.String(),
-	}
-	pred, err := loadPredicateData(p)
-	if err != nil {
-		return nil, err
-	}
-	src := &source{
-		store:     store,
-		pred:      pred,
-		shards:    []*shard{shd},
-		iters:     make([]*levigo.Iterator, 1),
-		nexts:     make([]*common.Span, 1),
-		numRead:   make([]int, 1),
-		keyPrefix: pred.getIndexPrefix(),
-	}
-	iter := shd.ldb.NewIterator(store.readOpts)
-	src.iters[0] = iter
-	searchKey := append(append([]byte{src.keyPrefix}, pred.key...),
-		pred.key...)
-	iter.Seek(searchKey)
-	return src, nil
-}
-
-// Fill in the entry in the 'next' array for a specific shard.
-func (src *source) populateNextFromShard(shardIdx int) {
-	lg := src.store.lg
-	var err error
-	iter := src.iters[shardIdx]
-	shdPath := src.shards[shardIdx].path
-	if iter == nil {
-		lg.Debugf("Can't populate: No more entries in shard %s\n", shdPath)
-		return // There are no more entries in this shard.
-	}
-	if src.nexts[shardIdx] != nil {
-		lg.Debugf("No need to populate shard %s\n", shdPath)
-		return // We already have a valid entry for this shard.
-	}
-	for {
-		if !iter.Valid() {
-			lg.Debugf("Can't populate: Iterator for shard %s is no longer valid.\n", shdPath)
-			break // Can't read past end of DB
-		}
-		src.numRead[shardIdx]++
-		key := iter.Key()
-		if len(key) < 1 {
-			lg.Warnf("Encountered invalid zero-byte key in shard %s.\n", shdPath)
-			break
-		}
-		ret := src.checkKeyPrefix(key[0], iter)
-		if ret == NOT_SATISFIED {
-			break // Can't read past end of indexed section
-		} else if ret == NOT_YET_SATISFIED {
-			if src.pred.Op.IsDescending() {
-				iter.Prev()
-			} else {
-				iter.Next()
-			}
-			continue // Try again because we are not yet at the indexed section.
-		}
-		var span *common.Span
-		var sid common.SpanId
-		if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
-			// The span id maps to the span itself.
-			sid = common.SpanId(key[1:17])
-			span, err = src.shards[shardIdx].decodeSpan(sid, iter.Value())
-			if err != nil {
-				if lg.DebugEnabled() {
-					lg.Debugf("Internal error decoding span %s in shard %s: %s\n",
-						sid.String(), shdPath, err.Error())
-				}
-				break
-			}
-		} else {
-			// With a secondary index, we have to look up the span by id.
-			sid = common.SpanId(key[9:25])
-			span = src.shards[shardIdx].FindSpan(sid)
-			if span == nil {
-				if lg.DebugEnabled() {
-					lg.Debugf("Internal error rehydrating span %s in shard %s\n",
-						sid.String(), shdPath)
-				}
-				break
-			}
-		}
-		if src.pred.Op.IsDescending() {
-			iter.Prev()
-		} else {
-			iter.Next()
-		}
-		ret = src.pred.satisfiedBy(span)
-		if ret == SATISFIED {
-			if lg.DebugEnabled() {
-				lg.Debugf("Populated valid span %v from shard %s.\n", sid, shdPath)
-			}
-			src.nexts[shardIdx] = span // Found valid entry
-			return
-		}
-		if ret == NOT_SATISFIED {
-			// This and subsequent entries don't satisfy predicate
-			break
-		}
-	}
-	lg.Debugf("Closing iterator for shard %s.\n", shdPath)
-	iter.Close()
-	src.iters[shardIdx] = nil
-}
-
-// Check the key prefix against the key prefix of the query.
-func (src *source) checkKeyPrefix(kp byte, iter *levigo.Iterator) satisfiedByReturn {
-	if kp == src.keyPrefix {
-		return SATISFIED
-	} else if kp < src.keyPrefix {
-		if src.pred.Op.IsDescending() {
-			return NOT_SATISFIED
-		} else {
-			return NOT_YET_SATISFIED
-		}
-	} else {
-		if src.pred.Op.IsDescending() {
-			return NOT_YET_SATISFIED
-		} else {
-			return NOT_SATISFIED
-		}
-	}
-}
-
-
-func (src *source) next() *common.Span {
-	for shardIdx := range src.shards {
-		src.populateNextFromShard(shardIdx)
-	}
-	var best *common.Span
-	bestIdx := -1
-	for shardIdx := range src.iters {
-		span := src.nexts[shardIdx]
-		if src.pred.spanPtrIsBefore(span, best) {
-			best = span
-			bestIdx = shardIdx
-		}
-	}
-	if bestIdx >= 0 {
-		src.nexts[bestIdx] = nil
-	}
-	return best
-}
-
-func (src *source) Close() {
-	for i := range src.iters {
-		if src.iters[i] != nil {
-			src.iters[i].Close()
-		}
-	}
-	src.iters = nil
-}
-
-func (src *source) getStats() string {
-	ret := fmt.Sprintf("Source stats: pred = %s", src.pred.String())
-	prefix := ". "
-	for shardIdx := range src.shards {
-		next := fmt.Sprintf("%sRead %d spans from %s", prefix,
-			src.numRead[shardIdx], src.shards[shardIdx].path)
-		prefix = ", "
-		ret = ret + next
-	}
-	return ret
-}
-
-func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) {
-	// Read spans from the first predicate that is indexed.
-	p := *preds
-	for i := range p {
-		pred := p[i]
-		if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
-			*preds = append(p[0:i], p[i+1:]...)
-			return pred.createSource(store, span)
-		}
-	}
-	// If there are no predicates that are indexed, read rows in order of span id.
-	spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
-		Field: common.SPAN_ID,
-		Val:   common.INVALID_SPAN_ID.String(),
-	}
-	spanIdPredData, err := loadPredicateData(&spanIdPred)
-	if err != nil {
-		return nil, err
-	}
-	return spanIdPredData.createSource(store, span)
-}
-
-func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error, []int) {
-	lg := store.lg
-	// Parse predicate data.
-	var err error
-	preds := make([]*predicateData, len(query.Predicates))
-	for i := range query.Predicates {
-		preds[i], err = loadPredicateData(&query.Predicates[i])
-		if err != nil {
-			return nil, err, nil
-		}
-	}
-	// Get a source of rows.
-	var src *source
-	src, err = store.obtainSource(&preds, query.Prev)
-	if err != nil {
-		return nil, err, nil
-	}
-	defer src.Close()
-	if lg.DebugEnabled() {
-		lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src)
-	}
-
-	// Filter the spans through the remaining predicates.
-	reserved := 32
-	if query.Lim < reserved {
-		reserved = query.Lim
-	}
-	ret := make([]*common.Span, 0, reserved)
-	for {
-		if len(ret) >= query.Lim {
-			if lg.DebugEnabled() {
-				lg.Debugf("HandleQuery %s: hit query limit after obtaining " +
-					"%d results. %s\n.", query, query.Lim, src.getStats())
-			}
-			break // we hit the result size limit
-		}
-		span := src.next()
-		if span == nil {
-			if lg.DebugEnabled() {
-				lg.Debugf("HandleQuery %s: found %d result(s), which are " +
-					"all that exist. %s\n", query, len(ret), src.getStats())
-			}
-			break // the source has no more spans to give
-		}
-		if lg.DebugEnabled() {
-			lg.Debugf("src.next returned span %s\n", span.ToJson())
-		}
-		satisfied := true
-		for predIdx := range preds {
-			if preds[predIdx].satisfiedBy(span) != SATISFIED {
-				satisfied = false
-				break
-			}
-		}
-		if satisfied {
-			ret = append(ret, span)
-		}
-	}
-	return ret, nil, src.numRead
-}
-
-func (store *dataStore) ServerStats() *common.ServerStats {
-	serverStats := common.ServerStats{
-		Dirs: make([]common.StorageDirectoryStats, len(store.shards)),
-	}
-	for shardIdx := range store.shards {
-		shard := store.shards[shardIdx]
-		serverStats.Dirs[shardIdx].Path = shard.path
-		r := levigo.Range{
-			Start: []byte{0},
-			Limit: []byte{0xff},
-		}
-		vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
-		serverStats.Dirs[shardIdx].ApproximateBytes = vals[0]
-		serverStats.Dirs[shardIdx].LevelDbStats =
-			shard.ldb.PropertyValue("leveldb.stats")
-		store.msink.lg.Debugf("levedb.stats for %s: %s\n",
-			shard.path, shard.ldb.PropertyValue("leveldb.stats"))
-	}
-	serverStats.LastStartMs = store.startMs
-	serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
-	serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
-	store.msink.PopulateServerStats(&serverStats)
-	return &serverStats
-}


Mime
View raw message