htrace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject incubator-htrace git commit: HTRACE-302. htraced: Add admissions control to HRPC to limit the number of incoming messages (cmccabe)
Date Fri, 20 Nov 2015 00:55:59 GMT
Repository: incubator-htrace
Updated Branches:
  refs/heads/master fc0d8f38f -> c715e12eb


HTRACE-302. htraced: Add admissions control to HRPC to limit the number of incoming messages
(cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/c715e12e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/c715e12e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/c715e12e

Branch: refs/heads/master
Commit: c715e12eb085cf551e90567f80c78886a3cc07f6
Parents: fc0d8f3
Author: Colin P. Mccabe <cmccabe@apache.org>
Authored: Thu Nov 19 16:45:42 2015 -0800
Committer: Colin P. Mccabe <cmccabe@apache.org>
Committed: Thu Nov 19 16:52:56 2015 -0800

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/client/client.go   |  28 ++-
 .../go/src/org/apache/htrace/client/hclient.go  |  11 +-
 .../go/src/org/apache/htrace/common/log.go      |   4 +
 .../src/org/apache/htrace/conf/config_keys.go   |  12 ++
 .../org/apache/htrace/htraced/client_test.go    | 133 ++++++++++++-
 .../src/org/apache/htrace/htraced/datastore.go  |   1 +
 .../org/apache/htrace/htraced/datastore_test.go |   4 +-
 .../go/src/org/apache/htrace/htraced/hrpc.go    | 193 ++++++++++++++-----
 .../go/src/org/apache/htrace/htraced/htraced.go |   2 +-
 .../org/apache/htrace/htraced/metrics_test.go   |   7 +-
 .../org/apache/htrace/htraced/mini_htraced.go   |   6 +-
 .../go/src/org/apache/htrace/htracedTool/cmd.go |   2 +-
 12 files changed, 333 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index 1140209..0028545 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -33,25 +33,35 @@ import (
 
 // A golang client for htraced.
 // TODO: fancier APIs for streaming spans in the background, optimize TCP stuff
-
-func NewClient(cnf *conf.Config) (*Client, error) {
-	hcl := Client{}
+func NewClient(cnf *conf.Config, testHooks *TestHooks) (*Client, error) {
+	hcl := Client{testHooks: testHooks}
 	hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS)
-	hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS)
+	if testHooks != nil && testHooks.HrpcDisabled {
+		hcl.hrpcAddr = ""
+	} else {
+		hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS)
+	}
 	return &hcl, nil
 }
 
+type TestHooks struct {
+	// If true, HRPC is disabled.
+	HrpcDisabled bool
+
+	// A function which gets called after we connect to the server and send the
+	// message frame, but before we write the message body.
+	HandleWriteRequestBody func()
+}
+
 type Client struct {
 	// REST address of the htraced server.
 	restAddr string
 
 	// HRPC address of the htraced server.
 	hrpcAddr string
-}
 
-// Disable HRPC
-func (hcl *Client) DisableHrpc() {
-	hcl.hrpcAddr = ""
+	// The test hooks to use, or nil if test hooks are not enabled.
+	testHooks *TestHooks
 }
 
 // Get the htraced server version information.
@@ -136,7 +146,7 @@ func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
 	if hcl.hrpcAddr == "" {
 		return hcl.writeSpansHttp(req)
 	}
-	hcr, err := newHClient(hcl.hrpcAddr)
+	hcr, err := newHClient(hcl.hrpcAddr, hcl.testHooks)
 	if err != nil {
 		return err
 	}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
index 608dd59..ef79deb 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
@@ -38,6 +38,7 @@ type hClient struct {
 type HrpcClientCodec struct {
 	rwc    io.ReadWriteCloser
 	length uint32
+	testHooks *TestHooks
 }
 
 func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) error {
@@ -72,6 +73,9 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{})
erro
 		return errors.New(fmt.Sprintf("Error writing header bytes: %s",
 			err.Error()))
 	}
+	if cdc.testHooks != nil && cdc.testHooks.HandleWriteRequestBody != nil {
+		cdc.testHooks.HandleWriteRequestBody()
+	}
 	_, err = cdc.rwc.Write(buf)
 	if err != nil {
 		return errors.New(fmt.Sprintf("Error writing body bytes: %s",
@@ -136,14 +140,17 @@ func (cdc *HrpcClientCodec) Close() error {
 	return cdc.rwc.Close()
 }
 
-func newHClient(hrpcAddr string) (*hClient, error) {
+func newHClient(hrpcAddr string, testHooks *TestHooks) (*hClient, error) {
 	hcr := hClient{}
 	conn, err := net.Dial("tcp", hrpcAddr)
 	if err != nil {
 		return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+
 			"at %s: %s", hrpcAddr, err.Error()))
 	}
-	hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn})
+	hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{
+		rwc: conn,
+		testHooks: testHooks,
+	})
 	return &hcr, nil
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/common/log.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/log.go b/htrace-htraced/go/src/org/apache/htrace/common/log.go
index 4066094..8a26507 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/log.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/log.go
@@ -291,6 +291,10 @@ func (lg *Logger) ErrorEnabled() bool {
 	return lg.Level <= ERROR
 }
 
+func (lg *Logger) LevelEnabled(level Level) bool {
+	return lg.Level <= level
+}
+
 func (lg *Logger) Close() {
 	lg.sink.Unref()
 	lg.sink = nil

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index d10f3af..511833c 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -86,6 +86,16 @@ const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms"
 // started.
 const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address"
 
+// The maximum number of HRPC handler goroutines we will create at once.  If
+// this is too small, we won't get enough concurrency; if it's too big, we will
+// buffer too much data in memory while waiting for the datastore to process
+// requests.
+const HTRACE_NUM_HRPC_HANDLERS = "num.hrpc.handlers"
+
+// The I/O timeout HRPC will use, in milliseconds.  If it takes longer than
+// this to read or write a message, we will abort the connection.
+const HTRACE_HRPC_IO_TIMEOUT_MS = "hrpc.io.timeout.ms"
+
 // Default values for HTrace configuration keys.
 var DEFAULTS = map[string]string{
 	HTRACE_WEB_ADDRESS:  fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
@@ -100,6 +110,8 @@ var DEFAULTS = map[string]string{
 	HTRACE_METRICS_MAX_ADDR_ENTRIES:    "100000",
 	HTRACE_SPAN_EXPIRY_MS:              "0",
 	HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  fmt.Sprintf("%d", 90*1000),
+	HTRACE_NUM_HRPC_HANDLERS:           "20",
+	HTRACE_HRPC_IO_TIMEOUT_MS:          "60000",
 }
 
 // Values to be used when creating test configurations

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index 9a51cd4..e4f2151 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -22,12 +22,15 @@ package main
 import (
 	"fmt"
 	"math/rand"
-	htrace "org/apache/htrace/client"
 	"org/apache/htrace/common"
+	"org/apache/htrace/conf"
 	"org/apache/htrace/test"
 	"sort"
 	"testing"
 	"time"
+	"sync"
+	"sync/atomic"
+	htrace "org/apache/htrace/client"
 )
 
 func TestClientGetServerVersion(t *testing.T) {
@@ -39,7 +42,7 @@ func TestClientGetServerVersion(t *testing.T) {
 	}
 	defer ht.Close()
 	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf())
+	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
@@ -58,7 +61,7 @@ func TestClientGetServerDebugInfo(t *testing.T) {
 	}
 	defer ht.Close()
 	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf())
+	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
@@ -95,7 +98,7 @@ func TestClientOperations(t *testing.T) {
 	}
 	defer ht.Close()
 	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf())
+	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
@@ -185,7 +188,7 @@ func TestDumpAll(t *testing.T) {
 	}
 	defer ht.Close()
 	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf())
+	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
@@ -246,7 +249,7 @@ func TestClientGetServerConf(t *testing.T) {
 	}
 	defer ht.Close()
 	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf())
+	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
@@ -259,3 +262,121 @@ func TestClientGetServerConf(t *testing.T) {
 			EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
 	}
 }
+
+const TEST_NUM_HRPC_HANDLERS = 2
+
+const TEST_NUM_WRITESPANS = 4
+
+// Tests that HRPC limits the number of simultaneous connections being processed.
+func TestHrpcAdmissionsControl(t *testing.T) {
+	var wg sync.WaitGroup
+	wg.Add(TEST_NUM_WRITESPANS)
+	var numConcurrentHrpcCalls int32
+	testHooks := &hrpcTestHooks {
+		HandleAdmission: func() {
+			defer wg.Done()
+			n := atomic.AddInt32(&numConcurrentHrpcCalls, 1)
+			if n > TEST_NUM_HRPC_HANDLERS {
+				t.Fatalf("The number of concurrent HRPC calls went above " +
+					"%d: it's at %d\n", TEST_NUM_HRPC_HANDLERS, n)
+			}
+			time.Sleep(1 * time.Millisecond)
+			n = atomic.AddInt32(&numConcurrentHrpcCalls, -1)
+			if n >= TEST_NUM_HRPC_HANDLERS {
+				t.Fatalf("The number of concurrent HRPC calls went above " +
+					"%d: it was at %d\n", TEST_NUM_HRPC_HANDLERS, n + 1)
+			}
+		},
+	}
+	htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl",
+		DataDirs: make([]string, 2),
+		Cnf: map[string]string{
+			conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
+		},
+		WrittenSpans: make(chan *common.Span, TEST_NUM_WRITESPANS),
+		HrpcTestHooks: testHooks,
+	}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create datastore: %s", err.Error())
+	}
+	defer ht.Close()
+	var hcl *htrace.Client
+	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+	if err != nil {
+		t.Fatalf("failed to create client: %s", err.Error())
+	}
+	// Create some random trace spans.
+	allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
+	for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
+		go func(i int) {
+			err = hcl.WriteSpans(&common.WriteSpansReq{
+				Spans: allSpans[i:i+1],
+			})
+			if err != nil {
+				t.Fatalf("WriteSpans failed: %s\n", err.Error())
+			}
+		}(iter)
+	}
+	wg.Wait()
+	for i := 0; i < TEST_NUM_WRITESPANS; i++ {
+		<-ht.Store.WrittenSpans
+	}
+}
+
+// Tests that HRPC I/O timeouts work.
+func TestHrpcIoTimeout(t *testing.T) {
+	htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout",
+		DataDirs: make([]string, 2),
+		Cnf: map[string]string{
+			conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
+			conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1",
+		},
+	}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create datastore: %s", err.Error())
+	}
+	defer ht.Close()
+	var hcl *htrace.Client
+	finishClient := make(chan interface{})
+	defer func() {
+		// Close the finishClient channel, if it hasn't already been closed. 
+		defer func() {recover()}()
+		close(finishClient)
+	}()
+	testHooks := &htrace.TestHooks {
+		HandleWriteRequestBody: func() {
+			<-finishClient
+		},
+	}
+	hcl, err = htrace.NewClient(ht.ClientConf(), testHooks)
+	if err != nil {
+		t.Fatalf("failed to create client: %s", err.Error())
+	}
+	// Create some random trace spans.
+	allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
+	var wg sync.WaitGroup
+	wg.Add(TEST_NUM_WRITESPANS)
+	for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
+		go func(i int) {
+			defer wg.Done()
+			// Ignore the error return because there are internal retries in
+			// the client which will make this succeed eventually, usually.
+			// Keep in mind that we only block until we have seen
+			// TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that,
+			// we let requests through so that the test can exit cleanly.
+			hcl.WriteSpans(&common.WriteSpansReq{
+				Spans: allSpans[i:i+1],
+			})
+		}(iter)
+	}
+	for {
+		if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS {
+			break
+		}
+		time.Sleep(1000 * time.Nanosecond)
+	}
+	close(finishClient)
+	wg.Wait()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index ab2747b..a4bb320 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -289,6 +289,7 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error {
 	}
 	shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
 	if shd.store.WrittenSpans != nil {
+		shd.store.lg.Errorf("WATERMELON: Sending span to shd.store.WrittenSpans\n")
 		shd.store.WrittenSpans <- span
 	}
 	return nil

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 576ee0b..0443834 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -410,7 +410,7 @@ func TestReloadDataStore(t *testing.T) {
 		}
 	}()
 	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf())
+	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
@@ -444,7 +444,7 @@ func TestReloadDataStore(t *testing.T) {
 	if err != nil {
 		t.Fatalf("failed to re-create datastore: %s", err.Error())
 	}
-	hcl, err = htrace.NewClient(ht.ClientConf())
+	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
 	if err != nil {
 		t.Fatalf("failed to re-create client: %s", err.Error())
 	}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index 0d72602..a0f2e81 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -33,9 +33,13 @@ import (
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
 	"reflect"
+	"sync"
+	"sync/atomic"
 	"time"
 )
 
+const MAX_HRPC_HANDLERS = 32765
+
 // Handles HRPC calls
 type HrpcHandler struct {
 	lg    *common.Logger
@@ -46,14 +50,57 @@ type HrpcHandler struct {
 type HrpcServer struct {
 	*rpc.Server
 	hand     *HrpcHandler
+
+	// The listener we are using to accept new connections.
 	listener net.Listener
+
+	// A WaitGroup used to block until the HRPC server has exited.
+	exited   sync.WaitGroup
+
+	// A channel containing server codecs to use.  This channel is fully
+	// buffered.  The number of entries it initially contains determines how
+	// many concurrent codecs we will have running at once.
+	cdcs     chan *HrpcServerCodec
+
+	// Used to shut down
+	shutdown chan interface{}
+
+	// The I/O timeout to use when reading requests or sending responses.  This
+	// timeout does not apply to the time we spend processing the message.
+	ioTimeo    time.Duration
+
+	// A count of all I/O errors that we have encountered since the server
+	// started.  This counts errors like improperly formatted message frames,
+	// but not errors like properly formatted but invalid messages.
+	// This count is updated from multiple goroutines via sync/atomic.
+	ioErrorCount  uint64
+
+	// The test hooks to use, or nil during normal operation.
+	testHooks *hrpcTestHooks
+}
+
+type hrpcTestHooks struct {
+	// A callback we make right after calling Accept() but before reading from
+	// the new connection.
+	HandleAdmission func()
 }
 
-// Codec which encodes HRPC data via JSON
+// A codec which encodes HRPC data via JSON.  This structure holds the context
+// for a particular client connection.
 type HrpcServerCodec struct {
 	lg     *common.Logger
+
+	// The current connection.
 	conn   net.Conn
+
+	// The HrpcServer which this connection is part of.
+	hsv    *HrpcServer
+
+	// The message length we read from the header.
 	length uint32
+
+	// The number of messages this connection has handled.
+	numHandled int
 }
 
 func asJson(val interface{}) string {
@@ -64,45 +111,51 @@ func asJson(val interface{}) string {
 	return string(js)
 }
 
-func createErrAndWarn(lg *common.Logger, val string) error {
-	return createErrAndLog(lg, val, common.WARN)
+func newIoErrorWarn(cdc *HrpcServerCodec, val string) error {
+	return newIoError(cdc, val, common.WARN)
 }
 
-func createErrAndLog(lg *common.Logger, val string, level common.Level) error {
-	lg.Write(level, val+"\n")
+func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error {
+	if cdc.lg.LevelEnabled(level) {
+		cdc.lg.Write(level, cdc.conn.RemoteAddr().String() + ": " + val + "\n")
+	}
+	if level >= common.INFO {
+		atomic.AddUint64(&cdc.hsv.ioErrorCount, 1)
+	}
 	return errors.New(val)
 }
 
 func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
 	hdr := common.HrpcRequestHeader{}
 	if cdc.lg.TraceEnabled() {
-		cdc.lg.Tracef("Reading HRPC request header from %s\n", cdc.conn.RemoteAddr())
+		cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr())
 	}
+	cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
 	err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
 	if err != nil {
-		level := common.WARN
-		if err == io.EOF {
-			level = common.DEBUG
+		if err == io.EOF && cdc.numHandled > 0 {
+			return newIoError(cdc, fmt.Sprintf("Remote closed connection " +
+				"after writing %d message(s)", cdc.numHandled), common.DEBUG)
 		}
-		return createErrAndLog(cdc.lg, fmt.Sprintf("Error reading header bytes: %s",
-			err.Error()), level)
+		return newIoError(cdc,
+			fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN)
 	}
 	if cdc.lg.TraceEnabled() {
-		cdc.lg.Tracef("Read HRPC request header %s from %s\n",
-			asJson(&hdr), cdc.conn.RemoteAddr())
+		cdc.lg.Tracef("%s: Read HRPC request header %s\n",
+			cdc.conn.RemoteAddr(), asJson(&hdr))
 	}
 	if hdr.Magic != common.HRPC_MAGIC {
-		return createErrAndWarn(cdc.lg, fmt.Sprintf("Invalid request header: expected "+
+		return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+
 			"magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
 	}
 	if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
-		return createErrAndWarn(cdc.lg, fmt.Sprintf("Length prefix was too long.  "+
+		return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long.  "+
 			"Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH,
 			hdr.Length))
 	}
 	req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
 	if req.ServiceMethod == "" {
-		return createErrAndWarn(cdc.lg, fmt.Sprintf("Unknown MethodID code 0x%04x",
+		return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x",
 			hdr.MethodId))
 	}
 	req.Seq = hdr.Seq
@@ -111,34 +164,36 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error
{
 }
 
 func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
-	remoteAddr := cdc.conn.RemoteAddr()
 	if cdc.lg.TraceEnabled() {
-		cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
-			cdc.length, remoteAddr)
+		cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
+			cdc.conn.RemoteAddr(), cdc.length)
 	}
 	mh := new(codec.MsgpackHandle)
 	mh.WriteExt = true
 	dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh)
 	err := dec.Decode(body)
 	if err != nil {
-		return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read request "+
-			"body from %s: %s", remoteAddr, err.Error()))
+		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte " +
+			"request body: %s", cdc.length, err.Error()))
 	}
 	if cdc.lg.TraceEnabled() {
-		cdc.lg.Tracef("Read body from %s: %s\n",
-			remoteAddr, asJson(&body))
+		cdc.lg.Tracef("%s: read %d-byte request body %s\n",
+			cdc.conn.RemoteAddr(), cdc.length, asJson(&body))
 	}
 	val := reflect.ValueOf(body)
 	addr := val.Elem().FieldByName("Addr")
 	if addr.IsValid() {
-		addr.SetString(remoteAddr.String())
+		addr.SetString(cdc.conn.RemoteAddr().String())
 	}
+	var zeroTime time.Time
+	cdc.conn.SetDeadline(zeroTime)
 	return nil
 }
 
 var EMPTY []byte = make([]byte, 0)
 
 func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
+	cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
 	var err error
 	buf := EMPTY
 	if msg != nil {
@@ -148,7 +203,7 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{})
e
 		enc := codec.NewEncoder(w, mh)
 		err := enc.Encode(msg)
 		if err != nil {
-			return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to marshal "+
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+
 				"response message: %s", err.Error()))
 		}
 		buf = w.Bytes()
@@ -161,13 +216,13 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{})
e
 	writer := bufio.NewWriterSize(cdc.conn, 256)
 	err = binary.Write(writer, binary.LittleEndian, &hdr)
 	if err != nil {
-		return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write response "+
+		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
 			"header: %s", err.Error()))
 	}
 	if hdr.ErrLength > 0 {
 		_, err = io.WriteString(writer, resp.Error)
 		if err != nil {
-			return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write error "+
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+
 				"string: %s", err.Error()))
 		}
 	}
@@ -175,24 +230,30 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{})
e
 		var length int
 		length, err = writer.Write(buf)
 		if err != nil {
-			return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write response "+
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
 				"message: %s", err.Error()))
 		}
 		if uint32(length) != hdr.Length {
-			return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write all of "+
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+
 				"response message: %s", err.Error()))
 		}
 	}
 	err = writer.Flush()
 	if err != nil {
-		return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write the response "+
+		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+
 			"bytes: %s", err.Error()))
 	}
+	cdc.numHandled++
 	return nil
 }
 
 func (cdc *HrpcServerCodec) Close() error {
-	return cdc.conn.Close()
+	err := cdc.conn.Close()
+	cdc.conn = nil
+	cdc.length = 0
+	cdc.numHandled = 0
+	cdc.hsv.cdcs <- cdc
+	return err
 }
 
 func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
@@ -228,14 +289,36 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
 	return nil
 }
 
-func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) {
+func CreateHrpcServer(cnf *conf.Config, store *dataStore,
+		testHooks *hrpcTestHooks) (*HrpcServer, error) {
 	lg := common.NewLogger("hrpc", cnf)
+	numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS)
+	if numHandlers < 1 {
+		lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS)
+		numHandlers = 1
+	}
+	if numHandlers > MAX_HRPC_HANDLERS {
+		lg.Warnf("%s cannot be more than %d: using %d handlers\n",
+			conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS)
+		numHandlers = MAX_HRPC_HANDLERS
+	}
 	hsv := &HrpcServer{
 		Server: rpc.NewServer(),
 		hand: &HrpcHandler{
 			lg:    lg,
 			store: store,
 		},
+		cdcs: make(chan *HrpcServerCodec, numHandlers),
+		shutdown: make(chan interface{}),
+		ioTimeo: time.Millisecond *
+			time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)),
+		testHooks: testHooks,
+	}
+	for i := 0; i < numHandlers; i++ {
+		hsv.cdcs <- &HrpcServerCodec{
+			lg:   lg,
+			hsv:  hsv,
+		}
 	}
 	var err error
 	hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
@@ -243,26 +326,42 @@ func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer,
error) {
 		return nil, err
 	}
 	hsv.Server.Register(hsv.hand)
+	hsv.exited.Add(1)
 	go hsv.run()
-	lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String())
+	lg.Infof("Started HRPC server on %s with %d handler routines. " +
+		"ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers,
+		hsv.ioTimeo.String())
 	return hsv, nil
 }
 
 func (hsv *HrpcServer) run() {
 	lg := hsv.hand.lg
+	srvAddr := hsv.listener.Addr().String()
+	defer func() {
+		lg.Infof("HrpcServer on %s exiting\n", srvAddr)
+		hsv.exited.Done()
+	}()
 	for {
-		conn, err := hsv.listener.Accept()
-		if err != nil {
-			lg.Errorf("HRPC Accept error: %s\n", err.Error())
-			continue
-		}
-		if lg.TraceEnabled() {
-			lg.Tracef("Accepted HRPC connection from %s\n", conn.RemoteAddr())
+		select {
+		case cdc:=<-hsv.cdcs:
+			conn, err := hsv.listener.Accept()
+			if err != nil {
+				lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error())
+				hsv.cdcs<-cdc // never blocks; there is always sufficient buffer space
+				continue
+			}
+			if lg.TraceEnabled() {
+				lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr())
+			}
+			cdc.conn = conn
+			cdc.numHandled = 0
+			if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil {
+				hsv.testHooks.HandleAdmission()
+			}
+			go hsv.ServeCodec(cdc)
+		case <-hsv.shutdown:
+			return
 		}
-		go hsv.ServeCodec(&HrpcServerCodec{
-			lg:   lg,
-			conn: conn,
-		})
 	}
 }
 
@@ -270,6 +369,12 @@ func (hsv *HrpcServer) Addr() net.Addr {
 	return hsv.listener.Addr()
 }
 
+func (hsv *HrpcServer) GetNumIoErrors() uint64 {
+	return atomic.LoadUint64(&hsv.ioErrorCount)
+}
+
 func (hsv *HrpcServer) Close() {
+	close(hsv.shutdown)
 	hsv.listener.Close()
+	hsv.exited.Wait()
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
index 97b72ca..5b0dfc6 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
@@ -110,7 +110,7 @@ func main() {
 	}
 	var hsv *HrpcServer
 	if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
-		hsv, err = CreateHrpcServer(cnf, store)
+		hsv, err = CreateHrpcServer(cnf, store, nil)
 		if err != nil {
 			lg.Errorf("Error creating HRPC server: %s\n", err.Error())
 			os.Exit(1)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index 48c20f0..5243d9e 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -205,13 +205,12 @@ func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) {
 	}
 	defer ht.Close()
 	var hcl *htrace.Client
-	hcl, err = htrace.NewClient(ht.ClientConf())
+	hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks {
+		HrpcDisabled: !usePacked,
+	})
 	if err != nil {
 		t.Fatalf("failed to create client: %s", err.Error())
 	}
-	if !usePacked {
-		hcl.DisableHrpc()
-	}
 
 	NUM_TEST_SPANS := 12
 	allSpans := createRandomTestSpans(NUM_TEST_SPANS)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index a50799a..353beae 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -55,6 +55,9 @@ type MiniHTracedBuilder struct {
 
 	// If non-null, the WrittenSpans channel to use when creating the DataStore.
 	WrittenSpans chan *common.Span
+
+	// The test hooks to use for the HRPC server
+	HrpcTestHooks *hrpcTestHooks
 }
 
 type MiniHTraced struct {
@@ -141,7 +144,7 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) {
 		return nil, err
 	}
 	rstListener = nil
-	hsv, err = CreateHrpcServer(cnf, store)
+	hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks)
 	if err != nil {
 		return nil, err
 	}
@@ -175,6 +178,7 @@ func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config {
 func (ht *MiniHTraced) Close() {
 	ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name)
 	ht.Rsv.Close()
+	ht.Hsv.Close()
 	ht.Store.Close()
 	if !ht.KeepDataDirsOnClose {
 		for idx := range ht.DataDirs {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 394e1c1..7b5e433 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -123,7 +123,7 @@ func main() {
 	}
 
 	// Create HTrace client
-	hcl, err := htrace.NewClient(cnf)
+	hcl, err := htrace.NewClient(cnf, nil)
 	if err != nil {
 		fmt.Printf("Failed to create HTrace client: %s\n", err.Error())
 		os.Exit(EXIT_FAILURE)


Mime
View raw message