Return-Path: X-Original-To: apmail-htrace-commits-archive@minotaur.apache.org Delivered-To: apmail-htrace-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0DB67199FE for ; Tue, 19 Apr 2016 23:32:51 +0000 (UTC) Received: (qmail 10078 invoked by uid 500); 19 Apr 2016 23:32:50 -0000 Delivered-To: apmail-htrace-commits-archive@htrace.apache.org Received: (qmail 10055 invoked by uid 500); 19 Apr 2016 23:32:50 -0000 Mailing-List: contact commits-help@htrace.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@htrace.incubator.apache.org Delivered-To: mailing list commits@htrace.incubator.apache.org Received: (qmail 10046 invoked by uid 99); 19 Apr 2016 23:32:50 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Apr 2016 23:32:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 1ED53C12C1 for ; Tue, 19 Apr 2016 23:32:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.216 X-Spam-Level: X-Spam-Status: No, score=-4.216 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.996] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Gxvn2HMFLkxT for ; Tue, 19 Apr 2016 23:32:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 1205D5F245 for ; Tue, 19 Apr 2016 23:32:44 +0000 (UTC) Received: (qmail 9992 invoked by uid 99); 19 Apr 2016 23:32:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Apr 2016 23:32:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 640A7DFF73; Tue, 19 Apr 2016 23:32:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: iwasakims@apache.org To: commits@htrace.incubator.apache.org Date: Tue, 19 Apr 2016 23:32:46 -0000 Message-Id: <35b604c229fc46cc9d5cf9b18cb700ba@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/7] incubator-htrace git commit: HTRACE-357. Rename htrace-htraced/go/src/org/apache/htrace to htrace-htraced/go/src/htrace (Colin Patrick McCabe via iwasakims) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go deleted file mode 100644 index 11e2733..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package common - -import ( - "testing" -) - -func testRoundTrip(t *testing.T, u int64) { - tme := UnixMsToTime(u) - u2 := TimeToUnixMs(tme) - if u2 != u { - t.Fatalf("Error taking %d on a round trip: came back as "+ - "%d instead.\n", u, u2) - } -} - -func TestTimeConversions(t *testing.T) { - testRoundTrip(t, 0) - testRoundTrip(t, 1445540632000) -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/config.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config.go b/htrace-htraced/go/src/org/apache/htrace/conf/config.go deleted file mode 100644 index 24170b2..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/conf/config.go +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package conf - -import ( - "bufio" - "bytes" - "fmt" - "io" - "log" - "os" - "path/filepath" - "sort" - "strconv" - "strings" - "syscall" -) - -// -// The configuration code for HTraced. -// -// HTraced can be configured via Hadoop-style XML configuration files, or by passing -Dkey=value -// command line arguments. Command-line arguments without an equals sign, such as "-Dkey", will be -// treated as setting the key to "true". -// -// Configuration key constants should be defined in config_keys.go. Each key should have a default, -// which will be used if the user supplies no value, or supplies an invalid value. -// For that reason, it is not necessary for the Get, GetInt, etc. functions to take a default value -// argument. -// -// Configuration objects are immutable. However, you can make a copy of a configuration which adds -// some changes using Configuration#Clone(). -// - -type Config struct { - settings map[string]string - defaults map[string]string -} - -type Builder struct { - // If non-nil, the XML configuration file to read. - Reader io.Reader - - // If non-nil, the configuration values to use. - Values map[string]string - - // If non-nil, the default configuration values to use. - Defaults map[string]string - - // If non-nil, the command-line arguments to use. - Argv []string - - // The name of the application. Configuration keys that start with this - // string will be converted to their unprefixed forms. - AppPrefix string -} - -func getDefaultHTracedConfDir() string { - return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf" -} - -func getHTracedConfDirs(dlog io.Writer) []string { - confDir := os.Getenv("HTRACED_CONF_DIR") - paths := filepath.SplitList(confDir) - if len(paths) < 1 { - def := getDefaultHTracedConfDir() - io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR defaulting to %s\n", def)) - return []string{def} - } - io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR=%s\n", confDir)) - return paths -} - -// Load a configuration from the application's argv, configuration file, and the standard -// defaults. -func LoadApplicationConfig(appPrefix string) (*Config, io.Reader) { - dlog := new(bytes.Buffer) - reader := openFile(CONFIG_FILE_NAME, getHTracedConfDirs(dlog), dlog) - bld := Builder{} - if reader != nil { - defer reader.Close() - bld.Reader = bufio.NewReader(reader) - } - bld.Argv = os.Args[1:] - bld.Defaults = DEFAULTS - bld.AppPrefix = appPrefix - cnf, err := bld.Build() - if err != nil { - log.Fatal("Error building configuration: " + err.Error()) - } - os.Args = append(os.Args[0:1], bld.Argv...) - keys := make(sort.StringSlice, 0, 20) - for k, _ := range cnf.settings { - keys = append(keys, k) - } - sort.Sort(keys) - prefix := "" - io.WriteString(dlog, "Read configuration: ") - for i := range keys { - io.WriteString(dlog, fmt.Sprintf(`%s%s = "%s"`, - prefix, keys[i], cnf.settings[keys[i]])) - prefix = ", " - } - return cnf, dlog -} - -// Attempt to open a configuration file somewhere on the provided list of paths. -func openFile(cnfName string, paths []string, dlog io.Writer) io.ReadCloser { - for p := range paths { - path := fmt.Sprintf("%s%c%s", paths[p], os.PathSeparator, cnfName) - file, err := os.Open(path) - if err == nil { - io.WriteString(dlog, fmt.Sprintf("Reading configuration from %s.\n", path)) - return file - } - if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOENT { - continue - } - io.WriteString(dlog, fmt.Sprintf("Error opening %s for read: %s\n", path, err.Error())) - } - return nil -} - -// Try to parse a command-line element as a key=value pair. -func parseAsConfigFlag(flag string) (string, string) { - var confPart string - if strings.HasPrefix(flag, "-D") { - confPart = flag[2:] - } else if strings.HasPrefix(flag, "--D") { - confPart = flag[3:] - } else { - return "", "" - } - if len(confPart) == 0 { - return "", "" - } - idx := strings.Index(confPart, "=") - if idx == -1 { - return confPart, "true" - } - return confPart[0:idx], confPart[idx+1:] -} - -// Build a new configuration object from the provided conf.Builder. -func (bld *Builder) Build() (*Config, error) { - // Load values and defaults - cnf := Config{} - cnf.settings = make(map[string]string) - if bld.Values != nil { - for k, v := range bld.Values { - cnf.settings[k] = v - } - } - cnf.defaults = make(map[string]string) - if bld.Defaults != nil { - for k, v := range bld.Defaults { - cnf.defaults[k] = v - } - } - - // Process the configuration file, if we have one - if bld.Reader != nil { - parseXml(bld.Reader, cnf.settings) - } - - // Process command line arguments - var i int - for i < len(bld.Argv) { - str := bld.Argv[i] - key, val := parseAsConfigFlag(str) - if key != "" { - cnf.settings[key] = val - bld.Argv = append(bld.Argv[:i], bld.Argv[i+1:]...) - } else { - i++ - } - } - cnf.settings = bld.removeApplicationPrefixes(cnf.settings) - cnf.defaults = bld.removeApplicationPrefixes(cnf.defaults) - return &cnf, nil -} - -func (bld *Builder) removeApplicationPrefixes(in map[string]string) map[string]string { - out := make(map[string]string) - for k, v := range in { - if strings.HasPrefix(k, bld.AppPrefix) { - out[k[len(bld.AppPrefix):]] = v - } else { - out[k] = v - } - } - return out -} - -// Returns true if the configuration has a non-default value for the given key. -func (cnf *Config) Contains(key string) bool { - _, ok := cnf.settings[key] - return ok -} - -// Get a string configuration key. -func (cnf *Config) Get(key string) string { - ret, hadKey := cnf.settings[key] - if hadKey { - return ret - } - return cnf.defaults[key] -} - -// Get a boolean configuration key. -func (cnf *Config) GetBool(key string) bool { - str := cnf.settings[key] - ret, err := strconv.ParseBool(str) - if err == nil { - return ret - } - str = cnf.defaults[key] - ret, err = strconv.ParseBool(str) - if err == nil { - return ret - } - return false -} - -// Get an integer configuration key. -func (cnf *Config) GetInt(key string) int { - str := cnf.settings[key] - ret, err := strconv.Atoi(str) - if err == nil { - return ret - } - str = cnf.defaults[key] - ret, err = strconv.Atoi(str) - if err == nil { - return ret - } - return 0 -} - -// Get an int64 configuration key. -func (cnf *Config) GetInt64(key string) int64 { - str := cnf.settings[key] - ret, err := strconv.ParseInt(str, 10, 64) - if err == nil { - return ret - } - str = cnf.defaults[key] - ret, err = strconv.ParseInt(str, 10, 64) - if err == nil { - return ret - } - return 0 -} - -// Make a deep copy of the given configuration. -// Optionally, you can specify particular key/value pairs to change. -// Example: -// cnf2 := cnf.Copy("my.changed.key", "my.new.value") -func (cnf *Config) Clone(args ...string) *Config { - if len(args)%2 != 0 { - panic("The arguments to Config#copy are key1, value1, " + - "key2, value2, and so on. You must specify an even number of arguments.") - } - ncnf := &Config{defaults: cnf.defaults} - ncnf.settings = make(map[string]string) - for k, v := range cnf.settings { - ncnf.settings[k] = v - } - for i := 0; i < len(args); i += 2 { - ncnf.settings[args[i]] = args[i+1] - } - return ncnf -} - -// Export the configuration as a map -func (cnf *Config) Export() map[string]string { - m := make(map[string]string) - for k, v := range cnf.defaults { - m[k] = v - } - for k, v := range cnf.settings { - m[k] = v - } - return m -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go deleted file mode 100644 index 16790d8..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package conf - -import ( - "fmt" - "os" -) - -// -// Configuration keys for HTrace. -// - -// The platform-specific path separator. Usually slash. -var PATH_SEP string = fmt.Sprintf("%c", os.PathSeparator) - -// The platform-specific path list separator. Usually colon. -var PATH_LIST_SEP string = fmt.Sprintf("%c", os.PathListSeparator) - -// The name of the XML configuration file to look for. -const CONFIG_FILE_NAME = "htraced-conf.xml" - -// An environment variable containing a list of paths to search for the -// configuration file in. -const HTRACED_CONF_DIR = "HTRACED_CONF_DIR" - -// The web address to start the REST server on. -const HTRACE_WEB_ADDRESS = "web.address" - -// The default port for the Htrace web address. -const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9096 - -// The web address to start the REST server on. -const HTRACE_HRPC_ADDRESS = "hrpc.address" - -// The default port for the Htrace HRPC address. -const HTRACE_HRPC_ADDRESS_DEFAULT_PORT = 9075 - -// The directories to put the data store into. Separated by PATH_LIST_SEP. -const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories" - -// Boolean key which indicates whether we should clear data on startup. -const HTRACE_DATA_STORE_CLEAR = "data.store.clear" - -// How many writes to buffer before applying backpressure to span senders. -const HTRACE_DATA_STORE_SPAN_BUFFER_SIZE = "data.store.span.buffer.size" - -// Path to put the logs from htrace, or the empty string to use stdout. -const HTRACE_LOG_PATH = "log.path" - -// The log level to use for the logs in htrace. -const HTRACE_LOG_LEVEL = "log.level" - -// The period between datastore heartbeats. This is the approximate interval at which we will -// prune expired spans. -const HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS = "datastore.heartbeat.period.ms" - -// The maximum number of addresses for which we will maintain metrics. -const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries" - -// The number of milliseconds we should keep spans before discarding them. -const HTRACE_SPAN_EXPIRY_MS = "span.expiry.ms" - -// The period between updates to the span reaper -const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms" - -// A host:port pair to send information to on startup. This is used in unit -// tests to determine the (random) port of the htraced process that has been -// started. -const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address" - -// The maximum number of HRPC handler goroutines we will create at once. If -// this is too small, we won't get enough concurrency; if it's too big, we will -// buffer too much data in memory while waiting for the datastore to process -// requests. -const HTRACE_NUM_HRPC_HANDLERS = "num.hrpc.handlers" - -// The I/O timeout HRPC will use, in milliseconds. If it takes longer than -// this to read or write a message, we will abort the connection. -const HTRACE_HRPC_IO_TIMEOUT_MS = "hrpc.io.timeout.ms" - -// The leveldb write buffer size, or 0 to use the library default, which is 4 -// MB in leveldb 1.16. See leveldb's options.h for more details. -const HTRACE_LEVELDB_WRITE_BUFFER_SIZE = "leveldb.write.buffer.size" - -// The LRU cache size for leveldb, in bytes. -const HTRACE_LEVELDB_CACHE_SIZE = "leveldb.cache.size" - -// Default values for HTrace configuration keys. -var DEFAULTS = map[string]string{ - HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT), - HTRACE_HRPC_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_HRPC_ADDRESS_DEFAULT_PORT), - HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" + - PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2", - HTRACE_DATA_STORE_CLEAR: "false", - HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100", - HTRACE_LOG_PATH: "", - HTRACE_LOG_LEVEL: "INFO", - HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000), - HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000", - HTRACE_SPAN_EXPIRY_MS: "0", - HTRACE_REAPER_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 90*1000), - HTRACE_NUM_HRPC_HANDLERS: "20", - HTRACE_HRPC_IO_TIMEOUT_MS: "60000", - HTRACE_LEVELDB_WRITE_BUFFER_SIZE: "0", - HTRACE_LEVELDB_CACHE_SIZE: fmt.Sprintf("%d", 100 * 1024 * 1024), -} - -// Values to be used when creating test configurations -func TEST_VALUES() map[string]string { - return map[string]string{ - HTRACE_HRPC_ADDRESS: ":0", // use a random port for the HRPC server - HTRACE_LOG_LEVEL: "TRACE", // show all log messages in tests - HTRACE_WEB_ADDRESS: ":0", // use a random port for the REST server - HTRACE_SPAN_EXPIRY_MS: "0", // never time out spans (unless testing the reaper) - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go deleted file mode 100644 index a681136..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package conf - -import ( - "bytes" - "os" - "strings" - "testing" -) - -// Test that parsing command-line arguments of the form -Dfoo=bar works. -func TestParseArgV(t *testing.T) { - t.Parallel() - argv := []string{"-Dfoo=bar", "-Dbaz=123", "-DsillyMode", "-Dlog.path="} - bld := &Builder{Argv: argv, - Defaults:map[string]string { - "log.path": "/log/path/default", - }} - cnf, err := bld.Build() - if err != nil { - t.Fatal() - } - if "bar" != cnf.Get("foo") { - t.Fatal() - } - if 123 != cnf.GetInt("baz") { - t.Fatal() - } - if !cnf.GetBool("sillyMode") { - t.Fatal() - } - if cnf.GetBool("otherSillyMode") { - t.Fatal() - } - if "" != cnf.Get("log.path") { - t.Fatal() - } -} - -// Test that default values work. -// Defaults are used only when the configuration option is not present or can't be parsed. -func TestDefaults(t *testing.T) { - t.Parallel() - argv := []string{"-Dfoo=bar", "-Dbaz=invalidNumber"} - defaults := map[string]string{ - "foo": "notbar", - "baz": "456", - "foo2": "4611686018427387904", - } - bld := &Builder{Argv: argv, Defaults: defaults} - cnf, err := bld.Build() - if err != nil { - t.Fatal() - } - if "bar" != cnf.Get("foo") { - t.Fatal() - } - if 456 != cnf.GetInt("baz") { - t.Fatal() - } - if 4611686018427387904 != cnf.GetInt64("foo2") { - t.Fatal() - } -} - -// Test that we can parse our XML configuration file. -func TestXmlConfigurationFile(t *testing.T) { - t.Parallel() - xml := ` - - - - - foo.bar - 123 - - - foo.baz - xmlValue - - - -` - xmlReader := strings.NewReader(xml) - argv := []string{"-Dfoo.bar=456"} - defaults := map[string]string{ - "foo.bar": "789", - "cmdline.opt": "4611686018427387904", - } - bld := &Builder{Argv: argv, Defaults: defaults, Reader: xmlReader} - cnf, err := bld.Build() - if err != nil { - t.Fatal() - } - // The command-line argument takes precedence over the XML and the defaults. - if 456 != cnf.GetInt("foo.bar") { - t.Fatal() - } - if "xmlValue" != cnf.Get("foo.baz") { - t.Fatalf("foo.baz = %s", cnf.Get("foo.baz")) - } - if "" != cnf.Get("commented.out") { - t.Fatal() - } - if 4611686018427387904 != cnf.GetInt64("cmdline.opt") { - t.Fatal() - } -} - -// Test our handling of the HTRACE_CONF_DIR environment variable. -func TestGetHTracedConfDirs(t *testing.T) { - os.Setenv("HTRACED_CONF_DIR", "") - dlog := new(bytes.Buffer) - dirs := getHTracedConfDirs(dlog) - if len(dirs) != 1 || dirs[0] != getDefaultHTracedConfDir() { - t.Fatal() - } - os.Setenv("HTRACED_CONF_DIR", "/foo/bar:/baz") - dirs = getHTracedConfDirs(dlog) - if len(dirs) != 2 || dirs[0] != "/foo/bar" || dirs[1] != "/baz" { - t.Fatal() - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/xml.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/xml.go b/htrace-htraced/go/src/org/apache/htrace/conf/xml.go deleted file mode 100644 index de14bc5..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/conf/xml.go +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package conf - -import ( - "encoding/xml" - "io" - "log" -) - -type configuration struct { - Properties []propertyXml `xml:"property"` -} - -type propertyXml struct { - Name string `xml:"name"` - Value string `xml:"value"` -} - -// Parse an XML configuration file. -func parseXml(reader io.Reader, m map[string]string) error { - dec := xml.NewDecoder(reader) - configurationXml := configuration{} - err := dec.Decode(&configurationXml) - if err != nil { - return err - } - props := configurationXml.Properties - for p := range props { - key := props[p].Name - value := props[p].Value - if key == "" { - log.Println("Warning: ignoring element with missing or empty .") - continue - } - if value == "" { - log.Println("Warning: ignoring element with key " + key + " with missing or empty .") - continue - } - //log.Printf("setting %s to %s\n", key, value) - m[key] = value - } - return nil -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go deleted file mode 100644 index 7b64914..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go +++ /dev/null @@ -1,484 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "fmt" - "github.com/ugorji/go/codec" - "math" - "math/rand" - htrace "org/apache/htrace/client" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "org/apache/htrace/test" - "sort" - "sync" - "sync/atomic" - "testing" - "time" -) - -func TestClientGetServerVersion(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerVersion", - DataDirs: make([]string, 2)} - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), nil) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - defer hcl.Close() - _, err = hcl.GetServerVersion() - if err != nil { - t.Fatalf("failed to call GetServerVersion: %s", err.Error()) - } -} - -func TestClientGetServerDebugInfo(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerDebugInfo", - DataDirs: make([]string, 2)} - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), nil) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - defer hcl.Close() - debugInfo, err := hcl.GetServerDebugInfo() - if err != nil { - t.Fatalf("failed to call GetServerDebugInfo: %s", err.Error()) - } - if debugInfo.StackTraces == "" { - t.Fatalf(`debugInfo.StackTraces == ""`) - } - if debugInfo.GCStats == "" { - t.Fatalf(`debugInfo.GCStats == ""`) - } -} - -func createRandomTestSpans(amount int) common.SpanSlice { - rnd := rand.New(rand.NewSource(2)) - allSpans := make(common.SpanSlice, amount) - allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0]) - for i := 1; i < amount; i++ { - allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i]) - } - allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)} - return allSpans -} - -func TestClientOperations(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations", - DataDirs: make([]string, 2), - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), nil) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - defer hcl.Close() - - // Create some random trace spans. - NUM_TEST_SPANS := 30 - allSpans := createRandomTestSpans(NUM_TEST_SPANS) - - // Write half of the spans to htraced via the client. - err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2]) - if err != nil { - t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2, - err.Error()) - } - ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS / 2)) - - // Look up the first half of the spans. They should be found. - var span *common.Span - for i := 0; i < NUM_TEST_SPANS/2; i++ { - span, err = hcl.FindSpan(allSpans[i].Id) - if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) - } - common.ExpectSpansEqual(t, allSpans[i], span) - } - - // Look up the second half of the spans. They should not be found. - for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ { - span, err = hcl.FindSpan(allSpans[i].Id) - if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) - } - if span != nil { - t.Fatalf("Unexpectedly found a span we never write to "+ - "the server: FindSpan(%d) succeeded\n", i) - } - } - - // Test FindChildren - childSpan := allSpans[1] - parentId := childSpan.Parents[0] - var children []common.SpanId - children, err = hcl.FindChildren(parentId, 1) - if err != nil { - t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error()) - } - if len(children) != 1 { - t.Fatalf("FindChildren(%s) returned an invalid number of "+ - "children: expected %d, got %d\n", parentId, 1, len(children)) - } - if !children[0].Equal(childSpan.Id) { - t.Fatalf("FindChildren(%s) returned an invalid child id: expected %s, "+ - " got %s\n", parentId, childSpan.Id, children[0]) - } - - // Test FindChildren on a span that has no children - childlessSpan := allSpans[NUM_TEST_SPANS/2] - children, err = hcl.FindChildren(childlessSpan.Id, 10) - if err != nil { - t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, err.Error()) - } - if len(children) != 0 { - t.Fatalf("FindChildren(%d) returned an invalid number of "+ - "children: expected %d, got %d\n", childlessSpan.Id, 0, len(children)) - } - - // Test Query - var query common.Query - query = common.Query{Lim: 10} - spans, err := hcl.Query(&query) - if err != nil { - t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error()) - } - if len(spans) != 10 { - t.Fatalf("Query({lim: %d}) returned an invalid number of "+ - "children: expected %d, got %d\n", 10, 10, len(spans)) - } -} - -func TestDumpAll(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll", - DataDirs: make([]string, 2), - WrittenSpans: common.NewSemaphore(0), - Cnf: map[string]string{ - conf.HTRACE_LOG_LEVEL: "INFO", - }, - } - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), nil) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - defer hcl.Close() - - NUM_TEST_SPANS := 100 - allSpans := createRandomTestSpans(NUM_TEST_SPANS) - sort.Sort(allSpans) - err = hcl.WriteSpans(allSpans) - if err != nil { - t.Fatalf("WriteSpans failed: %s\n", err.Error()) - } - ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS)) - out := make(chan *common.Span, NUM_TEST_SPANS) - var dumpErr error - go func() { - dumpErr = hcl.DumpAll(3, out) - }() - var numSpans int - nextLogTime := time.Now().Add(time.Millisecond * 5) - for { - span, channelOpen := <-out - if !channelOpen { - break - } - common.ExpectSpansEqual(t, allSpans[numSpans], span) - numSpans++ - if testing.Verbose() { - now := time.Now() - if !now.Before(nextLogTime) { - nextLogTime = now - nextLogTime = nextLogTime.Add(time.Millisecond * 5) - fmt.Printf("read back %d span(s)...\n", numSpans) - } - } - } - if numSpans != len(allSpans) { - t.Fatalf("expected to read %d spans... but only read %d\n", - len(allSpans), numSpans) - } - if dumpErr != nil { - t.Fatalf("got dump error %s\n", dumpErr.Error()) - } -} - -const EXAMPLE_CONF_KEY = "example.conf.key" -const EXAMPLE_CONF_VALUE = "foo.bar.baz" - -func TestClientGetServerConf(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf", - Cnf: map[string]string{ - EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE, - }, - DataDirs: make([]string, 2)} - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), nil) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - defer hcl.Close() - serverCnf, err2 := hcl.GetServerConf() - if err2 != nil { - t.Fatalf("failed to call GetServerConf: %s", err2.Error()) - } - if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE { - t.Fatalf("unexpected value for %s: %s", - EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE) - } -} - -const TEST_NUM_HRPC_HANDLERS = 2 - -const TEST_NUM_WRITESPANS = 4 - -// Tests that HRPC limits the number of simultaneous connections being processed. -func TestHrpcAdmissionsControl(t *testing.T) { - var wg sync.WaitGroup - wg.Add(TEST_NUM_WRITESPANS) - var numConcurrentHrpcCalls int32 - testHooks := &hrpcTestHooks{ - HandleAdmission: func() { - defer wg.Done() - n := atomic.AddInt32(&numConcurrentHrpcCalls, 1) - if n > TEST_NUM_HRPC_HANDLERS { - t.Fatalf("The number of concurrent HRPC calls went above "+ - "%d: it's at %d\n", TEST_NUM_HRPC_HANDLERS, n) - } - time.Sleep(1 * time.Millisecond) - n = atomic.AddInt32(&numConcurrentHrpcCalls, -1) - if n >= TEST_NUM_HRPC_HANDLERS { - t.Fatalf("The number of concurrent HRPC calls went above "+ - "%d: it was at %d\n", TEST_NUM_HRPC_HANDLERS, n+1) - } - }, - } - htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl", - DataDirs: make([]string, 2), - Cnf: map[string]string{ - conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS), - }, - WrittenSpans: common.NewSemaphore(0), - HrpcTestHooks: testHooks, - } - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), nil) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - // Create some random trace spans. - allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS) - for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ { - go func(i int) { - err = hcl.WriteSpans(allSpans[i : i+1]) - if err != nil { - t.Fatalf("WriteSpans failed: %s\n", err.Error()) - } - }(iter) - } - wg.Wait() - ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS)) -} - -// Tests that HRPC I/O timeouts work. -func TestHrpcIoTimeout(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout", - DataDirs: make([]string, 2), - Cnf: map[string]string{ - conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS), - conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1", - }, - } - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - finishClient := make(chan interface{}) - defer func() { - // Close the finishClient channel, if it hasn't already been closed. - defer func() { recover() }() - close(finishClient) - }() - testHooks := &htrace.TestHooks{ - HandleWriteRequestBody: func() { - <-finishClient - }, - } - hcl, err = htrace.NewClient(ht.ClientConf(), testHooks) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - // Create some random trace spans. - allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS) - var wg sync.WaitGroup - wg.Add(TEST_NUM_WRITESPANS) - for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ { - go func(i int) { - defer wg.Done() - // Ignore the error return because there are internal retries in - // the client which will make this succeed eventually, usually. - // Keep in mind that we only block until we have seen - // TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that, - // we let requests through so that the test can exit cleanly. - hcl.WriteSpans(allSpans[i : i+1]) - }(iter) - } - for { - if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS { - break - } - time.Sleep(1000 * time.Nanosecond) - } - close(finishClient) - wg.Wait() -} - -func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) { - htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans", - Cnf: map[string]string{ - conf.HTRACE_LOG_LEVEL: "INFO", - conf.HTRACE_NUM_HRPC_HANDLERS: "20", - }, - WrittenSpans: common.NewSemaphore(int64(1 - N)), - } - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - rnd := rand.New(rand.NewSource(1)) - allSpans := make([]*common.Span, N) - for n := 0; n < N; n++ { - allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n]) - } - // Determine how many calls to WriteSpans we should make. Each writeSpans - // message should be small enough so that it doesn't exceed the max RPC - // body length limit. TODO: a production-quality golang client would do - // this internally rather than needing us to do it here in the unit test. - bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5 - reqs := make([][]*common.Span, 0, 4) - curReq := -1 - curReqLen := bodyLen - var curReqSpans uint32 - mh := new(codec.MsgpackHandle) - mh.WriteExt = true - var mbuf [8192]byte - buf := mbuf[:0] - enc := codec.NewEncoderBytes(&buf, mh) - for n := 0; n < N; n++ { - span := allSpans[n] - if (curReqSpans >= maxSpansPerRpc) || - (curReqLen >= bodyLen) { - reqs = append(reqs, make([]*common.Span, 0, 16)) - curReqLen = 0 - curReq++ - curReqSpans = 0 - } - buf = mbuf[:0] - enc.ResetBytes(&buf) - err := enc.Encode(span) - if err != nil { - panic(fmt.Sprintf("Error encoding span %s: %s\n", - span.String(), err.Error())) - } - bufLen := len(buf) - if bufLen > (bodyLen / 5) { - panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen)) - } - curReqLen += bufLen - reqs[curReq] = append(reqs[curReq], span) - curReqSpans++ - } - ht.Store.lg.Infof("num spans: %d. num WriteSpansReq calls: %d\n", N, len(reqs)) - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), nil) - if err != nil { - panic(fmt.Sprintf("failed to create client: %s", err.Error())) - } - defer hcl.Close() - - // Reset the timer to avoid including the time required to create new - // random spans in the benchmark total. - if b != nil { - b.ResetTimer() - } - - // Write many random spans. - for reqIdx := range reqs { - go func(i int) { - err = hcl.WriteSpans(reqs[i]) - if err != nil { - panic(fmt.Sprintf("failed to send WriteSpans request %d: %s", - i, err.Error())) - } - }(reqIdx) - } - // Wait for all the spans to be written. - ht.Store.WrittenSpans.Wait() -} - -// This is a test of how quickly we can create new spans via WriteSpans RPCs. -// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore. -// Unlike that benchmark, it sends the spans via RPC. -// Suggested flags for running this: -// -tags unsafe -cpu 16 -benchtime=1m -func BenchmarkWriteSpans(b *testing.B) { - doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b) -} - -func TestWriteSpansRpcs(t *testing.T) { - doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil) -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go deleted file mode 100644 index 82fb7b5..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ /dev/null @@ -1,1340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bytes" - "encoding/hex" - "errors" - "fmt" - "github.com/jmhodges/levigo" - "github.com/ugorji/go/codec" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" -) - -// -// The data store code for HTraced. -// -// This code stores the trace spans. We use levelDB here so that we don't have to store everything -// in memory at all times. The data is sharded across multiple levelDB databases in multiple -// directories. Normally, these multiple directories will be on multiple disk drives. -// -// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data -// coming from many daemons. Durability is not as big a concern as in some data stores, since -// losing a little bit of trace data if htraced goes down is not critical. We use msgpack -// for serialization. We assume that there will be many more writes than reads. -// -// Schema -// w -> ShardInfo -// s[8-byte-big-endian-sid] -> SpanData -// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {} -// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {} -// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {} -// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {} -// -// Note that span IDs are unsigned 64-bit numbers. -// Begin times, end times, and durations are signed 64-bit numbers. -// In order to get LevelDB to properly compare the signed 64-bit quantities, -// we flip the highest bit. This way, we can get leveldb to view negative -// quantities as less than non-negative ones. This also means that we can do -// all queries using unsigned 64-bit math, rather than having to special-case -// the signed fields. -// - -var EMPTY_BYTE_BUF []byte = []byte{} - -const SPAN_ID_INDEX_PREFIX = 's' -const BEGIN_TIME_INDEX_PREFIX = 'b' -const END_TIME_INDEX_PREFIX = 'e' -const DURATION_INDEX_PREFIX = 'd' -const PARENT_ID_INDEX_PREFIX = 'p' -const INVALID_INDEX_PREFIX = 0 - -// The maximum span expiry time, in milliseconds. -// For all practical purposes this is "never" since it's more than a million years. -const MAX_SPAN_EXPIRY_MS = 0x7ffffffffffffff - -type IncomingSpan struct { - // The address that the span was sent from. - Addr string - - // The span. - *common.Span - - // Serialized span data - SpanDataBytes []byte -} - -// A single directory containing a levelDB instance. -type shard struct { - // The data store that this shard is part of - store *dataStore - - // The LevelDB instance. - ldb *levigo.DB - - // The path to the leveldb directory this shard is managing. - path string - - // Incoming requests to write Spans. - incoming chan []*IncomingSpan - - // A channel for incoming heartbeats - heartbeats chan interface{} - - // Tracks whether the shard goroutine has exited. - exited sync.WaitGroup -} - -// Process incoming spans for a shard. -func (shd *shard) processIncoming() { - lg := shd.store.lg - defer func() { - lg.Infof("Shard processor for %s exiting.\n", shd.path) - shd.exited.Done() - }() - for { - select { - case spans := <-shd.incoming: - if spans == nil { - return - } - totalWritten := 0 - totalDropped := 0 - for spanIdx := range spans { - err := shd.writeSpan(spans[spanIdx]) - if err != nil { - lg.Errorf("Shard processor for %s got fatal error %s.\n", - shd.path, err.Error()) - totalDropped++ - } else { - if lg.TraceEnabled() { - lg.Tracef("Shard processor for %s wrote span %s.\n", - shd.path, spans[spanIdx].ToJson()) - } - totalWritten++ - } - } - shd.store.msink.UpdatePersisted(spans[0].Addr, totalWritten, totalDropped) - if shd.store.WrittenSpans != nil { - lg.Debugf("Shard %s incrementing WrittenSpans by %d\n", shd.path, len(spans)) - shd.store.WrittenSpans.Posts(int64(len(spans))) - } - case <-shd.heartbeats: - lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path) - shd.pruneExpired() - } - } -} - -func (shd *shard) pruneExpired() { - lg := shd.store.rpr.lg - src, err := CreateReaperSource(shd) - if err != nil { - lg.Errorf("Error creating reaper source for shd(%s): %s\n", - shd.path, err.Error()) - return - } - var totalReaped uint64 - defer func() { - src.Close() - if totalReaped > 0 { - atomic.AddUint64(&shd.store.rpr.ReapedSpans, totalReaped) - } - }() - urdate := s2u64(shd.store.rpr.GetReaperDate()) - for { - span := src.next() - if span == nil { - lg.Debugf("After reaping %d span(s), no more found in shard %s "+ - "to reap.\n", totalReaped, shd.path) - return - } - begin := s2u64(span.Begin) - if begin >= urdate { - lg.Debugf("After reaping %d span(s), the remaining spans in "+ - "shard %s are new enough to be kept\n", - totalReaped, shd.path) - return - } - err = shd.DeleteSpan(span) - if err != nil { - lg.Errorf("Error deleting span %s from shd(%s): %s\n", - span.String(), shd.path, err.Error()) - return - } - if lg.TraceEnabled() { - lg.Tracef("Reaped span %s from shard %s\n", span.String(), shd.path) - } - totalReaped++ - } -} - -// Delete a span from the shard. Note that leveldb may retain the data until -// compaction(s) remove it. -func (shd *shard) DeleteSpan(span *common.Span) error { - batch := levigo.NewWriteBatch() - defer batch.Close() - primaryKey := - append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...) - batch.Delete(primaryKey) - for parentIdx := range span.Parents { - key := append(append([]byte{PARENT_ID_INDEX_PREFIX}, - span.Parents[parentIdx].Val()...), span.Id.Val()...) - batch.Delete(key) - } - beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX}, - u64toSlice(s2u64(span.Begin))...), span.Id.Val()...) - batch.Delete(beginTimeKey) - endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX}, - u64toSlice(s2u64(span.End))...), span.Id.Val()...) - batch.Delete(endTimeKey) - durationKey := append(append([]byte{DURATION_INDEX_PREFIX}, - u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...) - batch.Delete(durationKey) - err := shd.ldb.Write(shd.store.writeOpts, batch) - if err != nil { - return err - } - return nil -} - -// Convert a signed 64-bit number into an unsigned 64-bit number. We flip the -// highest bit, so that negative input values map to unsigned numbers which are -// less than non-negative input values. -func s2u64(val int64) uint64 { - ret := uint64(val) - ret ^= 0x8000000000000000 - return ret -} - -func u64toSlice(val uint64) []byte { - return []byte{ - byte(0xff & (val >> 56)), - byte(0xff & (val >> 48)), - byte(0xff & (val >> 40)), - byte(0xff & (val >> 32)), - byte(0xff & (val >> 24)), - byte(0xff & (val >> 16)), - byte(0xff & (val >> 8)), - byte(0xff & (val >> 0))} -} - -func (shd *shard) writeSpan(ispan *IncomingSpan) error { - batch := levigo.NewWriteBatch() - defer batch.Close() - span := ispan.Span - primaryKey := - append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...) - batch.Put(primaryKey, ispan.SpanDataBytes) - - // Add this to the parent index. - for parentIdx := range span.Parents { - key := append(append([]byte{PARENT_ID_INDEX_PREFIX}, - span.Parents[parentIdx].Val()...), span.Id.Val()...) - batch.Put(key, EMPTY_BYTE_BUF) - } - - // Add to the other secondary indices. - beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX}, - u64toSlice(s2u64(span.Begin))...), span.Id.Val()...) - batch.Put(beginTimeKey, EMPTY_BYTE_BUF) - endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX}, - u64toSlice(s2u64(span.End))...), span.Id.Val()...) - batch.Put(endTimeKey, EMPTY_BYTE_BUF) - durationKey := append(append([]byte{DURATION_INDEX_PREFIX}, - u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...) - batch.Put(durationKey, EMPTY_BYTE_BUF) - - err := shd.ldb.Write(shd.store.writeOpts, batch) - if err != nil { - shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n", - span.String(), shd.path, err.Error()) - return err - } - return nil -} - -func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId, - lim int32) ([]common.SpanId, int32, error) { - searchKey := append([]byte{PARENT_ID_INDEX_PREFIX}, sid.Val()...) - iter := shd.ldb.NewIterator(shd.store.readOpts) - defer iter.Close() - iter.Seek(searchKey) - for { - if !iter.Valid() { - break - } - if lim == 0 { - break - } - key := iter.Key() - if !bytes.HasPrefix(key, searchKey) { - break - } - id := common.SpanId(key[17:]) - childIds = append(childIds, id) - lim-- - iter.Next() - } - return childIds, lim, nil -} - -// Close a shard. -func (shd *shard) Close() { - lg := shd.store.lg - shd.incoming <- nil - lg.Infof("Waiting for %s to exit...\n", shd.path) - shd.exited.Wait() - shd.ldb.Close() - lg.Infof("Closed %s...\n", shd.path) -} - -type Reaper struct { - // The logger used by the reaper - lg *common.Logger - - // The number of milliseconds to keep spans around, in milliseconds. - spanExpiryMs int64 - - // The oldest date for which we'll keep spans. - reaperDate int64 - - // A channel used to send heartbeats to the reaper - heartbeats chan interface{} - - // Tracks whether the reaper goroutine has exited - exited sync.WaitGroup - - // The lock protecting reaper data. - lock sync.Mutex - - // The reaper heartbeater - hb *Heartbeater - - // The total number of spans which have been reaped. - ReapedSpans uint64 -} - -func NewReaper(cnf *conf.Config) *Reaper { - rpr := &Reaper{ - lg: common.NewLogger("reaper", cnf), - spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS), - heartbeats: make(chan interface{}, 1), - } - if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS { - rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS - } else if rpr.spanExpiryMs <= 0 { - rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS - } - rpr.hb = NewHeartbeater("ReaperHeartbeater", - cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg) - rpr.exited.Add(1) - go rpr.run() - rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{ - name: "reaper", - targetChan: rpr.heartbeats, - }) - var when string - if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS { - when = "never" - } else { - when = "after " + time.Duration(rpr.spanExpiryMs).String() - } - rpr.lg.Infof("Initializing span reaper: span time out = %s.\n", when) - return rpr -} - -func (rpr *Reaper) run() { - defer func() { - rpr.lg.Info("Exiting Reaper goroutine.\n") - rpr.exited.Done() - }() - - for { - _, isOpen := <-rpr.heartbeats - if !isOpen { - return - } - rpr.handleHeartbeat() - } -} - -func (rpr *Reaper) handleHeartbeat() { - // TODO: check dataStore fullness - now := common.TimeToUnixMs(time.Now().UTC()) - d, updated := func() (int64, bool) { - rpr.lock.Lock() - defer rpr.lock.Unlock() - newReaperDate := now - rpr.spanExpiryMs - if newReaperDate > rpr.reaperDate { - rpr.reaperDate = newReaperDate - return rpr.reaperDate, true - } else { - return rpr.reaperDate, false - } - }() - if rpr.lg.DebugEnabled() { - if updated { - rpr.lg.Debugf("Updating UTC reaper date to %s.\n", - common.UnixMsToTime(d).Format(time.RFC3339)) - } else { - rpr.lg.Debugf("Not updating previous reaperDate of %s.\n", - common.UnixMsToTime(d).Format(time.RFC3339)) - } - } -} - -func (rpr *Reaper) GetReaperDate() int64 { - rpr.lock.Lock() - defer rpr.lock.Unlock() - return rpr.reaperDate -} - -func (rpr *Reaper) SetReaperDate(rdate int64) { - rpr.lock.Lock() - defer rpr.lock.Unlock() - rpr.reaperDate = rdate -} - -func (rpr *Reaper) Shutdown() { - rpr.hb.Shutdown() - close(rpr.heartbeats) -} - -// The Data Store. -type dataStore struct { - lg *common.Logger - - // The shards which manage our LevelDB instances. - shards []*shard - - // The read options to use for LevelDB. - readOpts *levigo.ReadOptions - - // The write options to use for LevelDB. - writeOpts *levigo.WriteOptions - - // If non-null, a semaphore we will increment once for each span we receive. - // Used for testing. - WrittenSpans *common.Semaphore - - // The metrics sink. - msink *MetricsSink - - // The heartbeater which periodically asks shards to update the MetricsSink. - hb *Heartbeater - - // The reaper for this datastore - rpr *Reaper - - // When this datastore was started (in UTC milliseconds since the epoch) - startMs int64 -} - -func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataStore, error) { - dld := NewDataStoreLoader(cnf) - defer dld.Close() - err := dld.Load() - if err != nil { - dld.lg.Errorf("Error loading datastore: %s\n", err.Error()) - return nil, err - } - store := &dataStore { - lg: dld.lg, - shards: make([]*shard, len(dld.shards)), - readOpts: dld.readOpts, - writeOpts: dld.writeOpts, - WrittenSpans: writtenSpans, - msink: NewMetricsSink(cnf), - hb: NewHeartbeater("DatastoreHeartbeater", - cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), dld.lg), - rpr: NewReaper(cnf), - startMs: common.TimeToUnixMs(time.Now().UTC()), - } - spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) - for shdIdx := range store.shards { - shd := &shard { - store: store, - ldb: dld.shards[shdIdx].ldb, - path: dld.shards[shdIdx].path, - incoming: make(chan []*IncomingSpan, spanBufferSize), - heartbeats: make(chan interface{}, 1), - } - shd.exited.Add(1) - go shd.processIncoming() - store.shards[shdIdx] = shd - store.hb.AddHeartbeatTarget(&HeartbeatTarget{ - name: fmt.Sprintf("shard(%s)", shd.path), - targetChan: shd.heartbeats, - }) - } - dld.DisownResources() - return store, nil -} - -// Close the DataStore. -func (store *dataStore) Close() { - if store.hb != nil { - store.hb.Shutdown() - store.hb = nil - } - for idx := range store.shards { - if store.shards[idx] != nil { - store.shards[idx].Close() - store.shards[idx] = nil - } - } - if store.rpr != nil { - store.rpr.Shutdown() - store.rpr = nil - } - if store.readOpts != nil { - store.readOpts.Close() - store.readOpts = nil - } - if store.writeOpts != nil { - store.writeOpts.Close() - store.writeOpts = nil - } - if store.lg != nil { - store.lg.Close() - store.lg = nil - } -} - -// Get the index of the shard which stores the given spanId. -func (store *dataStore) getShardIndex(sid common.SpanId) int { - return int(sid.Hash32() % uint32(len(store.shards))) -} - -const WRITESPANS_BATCH_SIZE = 128 - -// SpanIngestor is a class used internally to ingest spans from an RPC -// endpoint. It groups spans destined for a particular shard into small -// batches, so that we can reduce the number of objects that need to be sent -// over the shard's "incoming" channel. Since sending objects over a channel -// requires goroutine synchronization, this improves performance. -// -// SpanIngestor also allows us to reuse the same encoder object for many spans, -// rather than creating a new encoder per span. This avoids re-doing the -// encoder setup for each span, and also generates less garbage. -type SpanIngestor struct { - // The logger to use. - lg *common.Logger - - // The dataStore we are ingesting spans into. - store *dataStore - - // The remote address these spans are coming from. - addr string - - // Default TracerId - defaultTrid string - - // The msgpack handle to use to serialize the spans. - mh codec.MsgpackHandle - - // The msgpack encoder to use to serialize the spans. - // Caching this avoids generating a lot of garbage and burning CPUs - // creating new encoder objects for each span. - enc *codec.Encoder - - // The buffer which codec.Encoder is currently serializing to. - // We have to create a new buffer for each span because once we hand it off to the shard, the - // shard manages the buffer lifecycle. - spanDataBytes []byte - - // An array mapping shard index to span batch. - batches []*SpanIngestorBatch - - // The total number of spans ingested. Includes dropped spans. - totalIngested int - - // The total number of spans the ingestor dropped because of a server-side error. - serverDropped int -} - -// A batch of spans destined for a particular shard. -type SpanIngestorBatch struct { - incoming []*IncomingSpan -} - -func (store *dataStore) NewSpanIngestor(lg *common.Logger, - addr string, defaultTrid string) *SpanIngestor { - ing := &SpanIngestor{ - lg: lg, - store: store, - addr: addr, - defaultTrid: defaultTrid, - spanDataBytes: make([]byte, 0, 1024), - batches: make([]*SpanIngestorBatch, len(store.shards)), - } - ing.mh.WriteExt = true - ing.enc = codec.NewEncoderBytes(&ing.spanDataBytes, &ing.mh) - for batchIdx := range ing.batches { - ing.batches[batchIdx] = &SpanIngestorBatch{ - incoming: make([]*IncomingSpan, 0, WRITESPANS_BATCH_SIZE), - } - } - return ing -} - -func (ing *SpanIngestor) IngestSpan(span *common.Span) { - ing.totalIngested++ - // Make sure the span ID is valid. - spanIdProblem := span.Id.FindProblem() - if spanIdProblem != "" { - // Can't print the invalid span ID because String() might fail. - ing.lg.Warnf("Invalid span ID: %s\n", spanIdProblem) - ing.serverDropped++ - return - } - - // Set the default tracer id, if needed. - if span.TracerId == "" { - span.TracerId = ing.defaultTrid - } - - // Encode the span data. Doing the encoding here is better than doing it - // in the shard goroutine, because we can achieve more parallelism. - // There is one shard goroutine per shard, but potentially many more - // ingestors per shard. - err := ing.enc.Encode(span.SpanData) - if err != nil { - ing.lg.Warnf("Failed to encode span ID %s: %s\n", - span.Id.String(), err.Error()) - ing.serverDropped++ - return - } - spanDataBytes := ing.spanDataBytes - ing.spanDataBytes = make([]byte, 0, 1024) - ing.enc.ResetBytes(&ing.spanDataBytes) - - // Determine which shard this span should go to. - shardIdx := ing.store.getShardIndex(span.Id) - batch := ing.batches[shardIdx] - incomingLen := len(batch.incoming) - if ing.lg.TraceEnabled() { - ing.lg.Tracef("SpanIngestor#IngestSpan: spanId=%s, shardIdx=%d, "+ - "incomingLen=%d, cap(batch.incoming)=%d\n", - span.Id.String(), shardIdx, incomingLen, cap(batch.incoming)) - } - if incomingLen+1 == cap(batch.incoming) { - if ing.lg.TraceEnabled() { - ing.lg.Tracef("SpanIngestor#IngestSpan: flushing %d spans for "+ - "shard %d\n", len(batch.incoming), shardIdx) - } - ing.store.WriteSpans(shardIdx, batch.incoming) - batch.incoming = make([]*IncomingSpan, 1, WRITESPANS_BATCH_SIZE) - incomingLen = 0 - } else { - batch.incoming = batch.incoming[0 : incomingLen+1] - } - batch.incoming[incomingLen] = &IncomingSpan{ - Addr: ing.addr, - Span: span, - SpanDataBytes: spanDataBytes, - } -} - -func (ing *SpanIngestor) Close(startTime time.Time) { - for shardIdx := range ing.batches { - batch := ing.batches[shardIdx] - if len(batch.incoming) > 0 { - if ing.lg.TraceEnabled() { - ing.lg.Tracef("SpanIngestor#Close: flushing %d span(s) for "+ - "shard %d\n", len(batch.incoming), shardIdx) - } - ing.store.WriteSpans(shardIdx, batch.incoming) - } - batch.incoming = nil - } - ing.lg.Debugf("Closed span ingestor for %s. Ingested %d span(s); dropped "+ - "%d span(s).\n", ing.addr, ing.totalIngested, ing.serverDropped) - - endTime := time.Now() - ing.store.msink.UpdateIngested(ing.addr, ing.totalIngested, - ing.serverDropped, endTime.Sub(startTime)) -} - -func (store *dataStore) WriteSpans(shardIdx int, ispans []*IncomingSpan) { - store.shards[shardIdx].incoming <- ispans -} - -func (store *dataStore) FindSpan(sid common.SpanId) *common.Span { - return store.shards[store.getShardIndex(sid)].FindSpan(sid) -} - -func (shd *shard) FindSpan(sid common.SpanId) *common.Span { - lg := shd.store.lg - primaryKey := append([]byte{SPAN_ID_INDEX_PREFIX}, sid.Val()...) - buf, err := shd.ldb.Get(shd.store.readOpts, primaryKey) - if err != nil { - if strings.Index(err.Error(), "NotFound:") != -1 { - return nil - } - lg.Warnf("Shard(%s): FindSpan(%s) error: %s\n", - shd.path, sid.String(), err.Error()) - return nil - } - var span *common.Span - span, err = shd.decodeSpan(sid, buf) - if err != nil { - lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s decoding [%s]\n", - shd.path, sid.String(), err.Error(), hex.EncodeToString(buf)) - return nil - } - return span -} - -func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) { - r := bytes.NewBuffer(buf) - mh := new(codec.MsgpackHandle) - mh.WriteExt = true - decoder := codec.NewDecoder(r, mh) - data := common.SpanData{} - err := decoder.Decode(&data) - if err != nil { - return nil, err - } - if data.Parents == nil { - data.Parents = []common.SpanId{} - } - return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil -} - -// Find the children of a given span id. -func (store *dataStore) FindChildren(sid common.SpanId, lim int32) []common.SpanId { - childIds := make([]common.SpanId, 0) - var err error - - startIdx := store.getShardIndex(sid) - idx := startIdx - numShards := len(store.shards) - for { - if lim == 0 { - break - } - shd := store.shards[idx] - childIds, lim, err = shd.FindChildren(sid, childIds, lim) - if err != nil { - store.lg.Errorf("Shard(%s): FindChildren(%s) error: %s\n", - shd.path, sid.String(), err.Error()) - } - idx++ - if idx >= numShards { - idx = 0 - } - if idx == startIdx { - break - } - } - return childIds -} - -type predicateData struct { - *common.Predicate - key []byte -} - -func loadPredicateData(pred *common.Predicate) (*predicateData, error) { - p := predicateData{Predicate: pred} - - // Parse the input value given to make sure it matches up with the field - // type. - switch pred.Field { - case common.SPAN_ID: - // Span IDs are sent as hex strings. - var id common.SpanId - if err := id.FromString(pred.Val); err != nil { - return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s", - pred.Val, err.Error())) - } - p.key = id.Val() - break - case common.DESCRIPTION: - // Any string is valid for a description. - p.key = []byte(pred.Val) - break - case common.BEGIN_TIME, common.END_TIME, common.DURATION: - // Parse a base-10 signed numeric field. - v, err := strconv.ParseInt(pred.Val, 10, 64) - if err != nil { - return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s", - pred.Field, pred.Val, err.Error())) - } - p.key = u64toSlice(s2u64(v)) - break - case common.TRACER_ID: - // Any string is valid for a tracer ID. - p.key = []byte(pred.Val) - break - default: - return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field)) - } - - // Validate the predicate operation. - switch pred.Op { - case common.EQUALS, common.LESS_THAN_OR_EQUALS, - common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN: - break - case common.CONTAINS: - if p.fieldIsNumeric() { - return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+ - "numeric field like '%s'", pred.Field)) - } - default: - return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'", - pred.Op)) - } - - return &p, nil -} - -// Get the index prefix for this predicate, or 0 if it is not indexed. -func (pred *predicateData) getIndexPrefix() byte { - switch pred.Field { - case common.SPAN_ID: - return SPAN_ID_INDEX_PREFIX - case common.BEGIN_TIME: - return BEGIN_TIME_INDEX_PREFIX - case common.END_TIME: - return END_TIME_INDEX_PREFIX - case common.DURATION: - return DURATION_INDEX_PREFIX - default: - return INVALID_INDEX_PREFIX - } -} - -// Returns true if the predicate type is numeric. -func (pred *predicateData) fieldIsNumeric() bool { - switch pred.Field { - case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION: - return true - default: - return false - } -} - -// Get the values that this predicate cares about for a given span. -func (pred *predicateData) extractRelevantSpanData(span *common.Span) []byte { - switch pred.Field { - case common.SPAN_ID: - return span.Id.Val() - case common.DESCRIPTION: - return []byte(span.Description) - case common.BEGIN_TIME: - return u64toSlice(s2u64(span.Begin)) - case common.END_TIME: - return u64toSlice(s2u64(span.End)) - case common.DURATION: - return u64toSlice(s2u64(span.Duration())) - case common.TRACER_ID: - return []byte(span.TracerId) - default: - panic(fmt.Sprintf("Unknown field type %s.", pred.Field)) - } -} - -func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool { - // nil is after everything. - if a == nil { - if b == nil { - return false - } - return false - } else if b == nil { - return true - } - // Compare the spans according to this predicate. - aVal := pred.extractRelevantSpanData(a) - bVal := pred.extractRelevantSpanData(b) - cmp := bytes.Compare(aVal, bVal) - if pred.Op.IsDescending() { - return cmp > 0 - } else { - return cmp < 0 - } -} - -type satisfiedByReturn int - -const ( - NOT_SATISFIED satisfiedByReturn = iota - NOT_YET_SATISFIED = iota - SATISFIED = iota -) - -func (r satisfiedByReturn) String() string { - switch (r) { - case NOT_SATISFIED: - return "NOT_SATISFIED" - case NOT_YET_SATISFIED: - return "NOT_YET_SATISFIED" - case SATISFIED: - return "SATISFIED" - default: - return "(unknown)" - } -} - -// Determine whether the predicate is satisfied by the given span. -func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn { - val := pred.extractRelevantSpanData(span) - switch pred.Op { - case common.CONTAINS: - if bytes.Contains(val, pred.key) { - return SATISFIED - } else { - return NOT_SATISFIED - } - case common.EQUALS: - if bytes.Equal(val, pred.key) { - return SATISFIED - } else { - return NOT_SATISFIED - } - case common.LESS_THAN_OR_EQUALS: - if bytes.Compare(val, pred.key) <= 0 { - return SATISFIED - } else { - return NOT_YET_SATISFIED - } - case common.GREATER_THAN_OR_EQUALS: - if bytes.Compare(val, pred.key) >= 0 { - return SATISFIED - } else { - return NOT_SATISFIED - } - case common.GREATER_THAN: - cmp := bytes.Compare(val, pred.key) - if cmp <= 0 { - return NOT_YET_SATISFIED - } else { - return SATISFIED - } - default: - panic(fmt.Sprintf("unknown Op type %s should have been caught "+ - "during normalization", pred.Op)) - } -} - -func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*source, error) { - var ret *source - src := source{store: store, - pred: pred, - shards: make([]*shard, len(store.shards)), - iters: make([]*levigo.Iterator, 0, len(store.shards)), - nexts: make([]*common.Span, len(store.shards)), - numRead: make([]int, len(store.shards)), - keyPrefix: pred.getIndexPrefix(), - } - if src.keyPrefix == INVALID_INDEX_PREFIX { - return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+ - "predicate on field %s", pred.Field)) - } - defer func() { - if ret == nil { - src.Close() - } - }() - for shardIdx := range store.shards { - shd := store.shards[shardIdx] - src.shards[shardIdx] = shd - src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts)) - } - var searchKey []byte - lg := store.lg - if prev != nil { - // If prev != nil, this query RPC is the continuation of a previous - // one. The final result returned the last time is 'prev'. - // - // To avoid returning the same results multiple times, we adjust the - // predicate here. If the predicate is on the span id field, we - // simply manipulate the span ID we're looking for. - // - // If the predicate is on a secondary index, we also use span ID, but - // in a slightly different way. Since the secondary indices are - // organized as [type-code][8b-secondary-key][8b-span-id], elements - // with the same secondary index field are ordered by span ID. So we - // create a 17-byte key incorporating the span ID from 'prev.' - startId := common.INVALID_SPAN_ID - switch pred.Op { - case common.EQUALS: - if pred.Field == common.SPAN_ID { - // This is an annoying corner case. There can only be one - // result each time we do an EQUALS search for a span id. - // Span id is the primary key for all our spans. - // But for some reason someone is asking for another result. - // We modify the query to search for the illegal 0 span ID, - // which will never be present. - if lg.DebugEnabled() { - lg.Debugf("Attempted to use a continuation token with an EQUALS "+ - "SPAN_ID query. %s. Setting search id = 0", - pred.Predicate.String()) - } - startId = common.INVALID_SPAN_ID - } else { - // When doing an EQUALS search on a secondary index, the - // results are sorted by span id. - startId = prev.Id.Next() - } - case common.LESS_THAN_OR_EQUALS: - // Subtract one from the previous span id. Since the previous - // start ID will never be 0 (0 is an illegal span id), we'll never - // wrap around when doing this. - startId = prev.Id.Prev() - case common.GREATER_THAN_OR_EQUALS: - // We can't add one to the span id, since the previous span ID - // might be the maximum value. So just switch over to using - // GREATER_THAN. - pred.Op = common.GREATER_THAN - startId = prev.Id - case common.GREATER_THAN: - // This one is easy. - startId = prev.Id - default: - str := fmt.Sprintf("Can't use a %v predicate as a source.", pred.Predicate.String()) - lg.Error(str + "\n") - panic(str) - } - if pred.Field == common.SPAN_ID { - pred.key = startId.Val() - searchKey = append([]byte{src.keyPrefix}, startId.Val()...) - } else { - // Start where the previous query left off. This means adjusting - // our uintKey. - pred.key = pred.extractRelevantSpanData(prev) - searchKey = append(append([]byte{src.keyPrefix}, pred.key...), - startId.Val()...) - } - if lg.TraceEnabled() { - lg.Tracef("Handling continuation token %s for %s. startId=%d, "+ - "pred.uintKey=%s\n", prev, pred.Predicate.String(), startId, - hex.EncodeToString(pred.key)) - } - } else { - searchKey = append([]byte{src.keyPrefix}, pred.key...) - } - for i := range src.iters { - src.iters[i].Seek(searchKey) - } - ret = &src - return ret, nil -} - -// A source of spans. -type source struct { - store *dataStore - pred *predicateData - shards []*shard - iters []*levigo.Iterator - nexts []*common.Span - numRead []int - keyPrefix byte -} - -func CreateReaperSource(shd *shard) (*source, error) { - store := shd.store - p := &common.Predicate{ - Op: common.GREATER_THAN_OR_EQUALS, - Field: common.BEGIN_TIME, - Val: common.INVALID_SPAN_ID.String(), - } - pred, err := loadPredicateData(p) - if err != nil { - return nil, err - } - src := &source{ - store: store, - pred: pred, - shards: []*shard{shd}, - iters: make([]*levigo.Iterator, 1), - nexts: make([]*common.Span, 1), - numRead: make([]int, 1), - keyPrefix: pred.getIndexPrefix(), - } - iter := shd.ldb.NewIterator(store.readOpts) - src.iters[0] = iter - searchKey := append(append([]byte{src.keyPrefix}, pred.key...), - pred.key...) - iter.Seek(searchKey) - return src, nil -} - -// Fill in the entry in the 'next' array for a specific shard. -func (src *source) populateNextFromShard(shardIdx int) { - lg := src.store.lg - var err error - iter := src.iters[shardIdx] - shdPath := src.shards[shardIdx].path - if iter == nil { - lg.Debugf("Can't populate: No more entries in shard %s\n", shdPath) - return // There are no more entries in this shard. - } - if src.nexts[shardIdx] != nil { - lg.Debugf("No need to populate shard %s\n", shdPath) - return // We already have a valid entry for this shard. - } - for { - if !iter.Valid() { - lg.Debugf("Can't populate: Iterator for shard %s is no longer valid.\n", shdPath) - break // Can't read past end of DB - } - src.numRead[shardIdx]++ - key := iter.Key() - if len(key) < 1 { - lg.Warnf("Encountered invalid zero-byte key in shard %s.\n", shdPath) - break - } - ret := src.checkKeyPrefix(key[0], iter) - if ret == NOT_SATISFIED { - break // Can't read past end of indexed section - } else if ret == NOT_YET_SATISFIED { - if src.pred.Op.IsDescending() { - iter.Prev() - } else { - iter.Next() - } - continue // Try again because we are not yet at the indexed section. - } - var span *common.Span - var sid common.SpanId - if src.keyPrefix == SPAN_ID_INDEX_PREFIX { - // The span id maps to the span itself. - sid = common.SpanId(key[1:17]) - span, err = src.shards[shardIdx].decodeSpan(sid, iter.Value()) - if err != nil { - if lg.DebugEnabled() { - lg.Debugf("Internal error decoding span %s in shard %s: %s\n", - sid.String(), shdPath, err.Error()) - } - break - } - } else { - // With a secondary index, we have to look up the span by id. - sid = common.SpanId(key[9:25]) - span = src.shards[shardIdx].FindSpan(sid) - if span == nil { - if lg.DebugEnabled() { - lg.Debugf("Internal error rehydrating span %s in shard %s\n", - sid.String(), shdPath) - } - break - } - } - if src.pred.Op.IsDescending() { - iter.Prev() - } else { - iter.Next() - } - ret = src.pred.satisfiedBy(span) - if ret == SATISFIED { - if lg.DebugEnabled() { - lg.Debugf("Populated valid span %v from shard %s.\n", sid, shdPath) - } - src.nexts[shardIdx] = span // Found valid entry - return - } - if ret == NOT_SATISFIED { - // This and subsequent entries don't satisfy predicate - break - } - } - lg.Debugf("Closing iterator for shard %s.\n", shdPath) - iter.Close() - src.iters[shardIdx] = nil -} - -// Check the key prefix against the key prefix of the query. -func (src *source) checkKeyPrefix(kp byte, iter *levigo.Iterator) satisfiedByReturn { - if kp == src.keyPrefix { - return SATISFIED - } else if kp < src.keyPrefix { - if src.pred.Op.IsDescending() { - return NOT_SATISFIED - } else { - return NOT_YET_SATISFIED - } - } else { - if src.pred.Op.IsDescending() { - return NOT_YET_SATISFIED - } else { - return NOT_SATISFIED - } - } -} - - -func (src *source) next() *common.Span { - for shardIdx := range src.shards { - src.populateNextFromShard(shardIdx) - } - var best *common.Span - bestIdx := -1 - for shardIdx := range src.iters { - span := src.nexts[shardIdx] - if src.pred.spanPtrIsBefore(span, best) { - best = span - bestIdx = shardIdx - } - } - if bestIdx >= 0 { - src.nexts[bestIdx] = nil - } - return best -} - -func (src *source) Close() { - for i := range src.iters { - if src.iters[i] != nil { - src.iters[i].Close() - } - } - src.iters = nil -} - -func (src *source) getStats() string { - ret := fmt.Sprintf("Source stats: pred = %s", src.pred.String()) - prefix := ". " - for shardIdx := range src.shards { - next := fmt.Sprintf("%sRead %d spans from %s", prefix, - src.numRead[shardIdx], src.shards[shardIdx].path) - prefix = ", " - ret = ret + next - } - return ret -} - -func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) { - // Read spans from the first predicate that is indexed. - p := *preds - for i := range p { - pred := p[i] - if pred.getIndexPrefix() != INVALID_INDEX_PREFIX { - *preds = append(p[0:i], p[i+1:]...) - return pred.createSource(store, span) - } - } - // If there are no predicates that are indexed, read rows in order of span id. - spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS, - Field: common.SPAN_ID, - Val: common.INVALID_SPAN_ID.String(), - } - spanIdPredData, err := loadPredicateData(&spanIdPred) - if err != nil { - return nil, err - } - return spanIdPredData.createSource(store, span) -} - -func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error, []int) { - lg := store.lg - // Parse predicate data. - var err error - preds := make([]*predicateData, len(query.Predicates)) - for i := range query.Predicates { - preds[i], err = loadPredicateData(&query.Predicates[i]) - if err != nil { - return nil, err, nil - } - } - // Get a source of rows. - var src *source - src, err = store.obtainSource(&preds, query.Prev) - if err != nil { - return nil, err, nil - } - defer src.Close() - if lg.DebugEnabled() { - lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src) - } - - // Filter the spans through the remaining predicates. - reserved := 32 - if query.Lim < reserved { - reserved = query.Lim - } - ret := make([]*common.Span, 0, reserved) - for { - if len(ret) >= query.Lim { - if lg.DebugEnabled() { - lg.Debugf("HandleQuery %s: hit query limit after obtaining " + - "%d results. %s\n.", query, query.Lim, src.getStats()) - } - break // we hit the result size limit - } - span := src.next() - if span == nil { - if lg.DebugEnabled() { - lg.Debugf("HandleQuery %s: found %d result(s), which are " + - "all that exist. %s\n", query, len(ret), src.getStats()) - } - break // the source has no more spans to give - } - if lg.DebugEnabled() { - lg.Debugf("src.next returned span %s\n", span.ToJson()) - } - satisfied := true - for predIdx := range preds { - if preds[predIdx].satisfiedBy(span) != SATISFIED { - satisfied = false - break - } - } - if satisfied { - ret = append(ret, span) - } - } - return ret, nil, src.numRead -} - -func (store *dataStore) ServerStats() *common.ServerStats { - serverStats := common.ServerStats{ - Dirs: make([]common.StorageDirectoryStats, len(store.shards)), - } - for shardIdx := range store.shards { - shard := store.shards[shardIdx] - serverStats.Dirs[shardIdx].Path = shard.path - r := levigo.Range{ - Start: []byte{0}, - Limit: []byte{0xff}, - } - vals := shard.ldb.GetApproximateSizes([]levigo.Range{r}) - serverStats.Dirs[shardIdx].ApproximateBytes = vals[0] - serverStats.Dirs[shardIdx].LevelDbStats = - shard.ldb.PropertyValue("leveldb.stats") - store.msink.lg.Debugf("levedb.stats for %s: %s\n", - shard.path, shard.ldb.PropertyValue("leveldb.stats")) - } - serverStats.LastStartMs = store.startMs - serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC()) - serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans) - store.msink.PopulateServerStats(&serverStats) - return &serverStats -}