htrace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject incubator-htrace git commit: HTRACE-308. Deserialize WriteSpans requests incrementally rather than all at once to optimize GC (cmccabe)
Date Tue, 01 Dec 2015 21:32:49 GMT
Repository: incubator-htrace
Updated Branches:
  refs/heads/master 2ccb38813 -> 35053cfc5


HTRACE-308. Deserialize WriteSpans requests incrementally rather than all at once to optimize
GC (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/35053cfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/35053cfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/35053cfc

Branch: refs/heads/master
Commit: 35053cfc557c24e94b7336fc24801d8bdfafc973
Parents: 2ccb388
Author: Colin P. Mccabe <cmccabe@apache.org>
Authored: Tue Dec 1 13:27:53 2015 -0800
Committer: Colin P. Mccabe <cmccabe@apache.org>
Committed: Tue Dec 1 13:27:53 2015 -0800

----------------------------------------------------------------------
 htrace-c/src/receiver/htraced.c                 | 10 +--
 htrace-htraced/go/Godeps/Godeps.json            |  2 +-
 .../go/src/org/apache/htrace/client/client.go   | 20 +++--
 .../go/src/org/apache/htrace/client/hclient.go  | 41 +++++++---
 .../go/src/org/apache/htrace/common/rpc.go      |  6 +-
 .../org/apache/htrace/htraced/client_test.go    | 31 +++-----
 .../org/apache/htrace/htraced/datastore_test.go |  4 +-
 .../go/src/org/apache/htrace/htraced/hrpc.go    | 83 ++++++++++++--------
 .../org/apache/htrace/htraced/metrics_test.go   |  4 +-
 .../go/src/org/apache/htrace/htraced/rest.go    | 38 ++++-----
 .../go/src/org/apache/htrace/htracedTool/cmd.go |  4 +-
 .../main/java/org/apache/htrace/impl/Conf.java  |  4 +-
 .../org/apache/htrace/impl/PackedBuffer.java    |  8 +-
 .../apache/htrace/impl/RestBufferManager.java   | 23 ++----
 14 files changed, 152 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-c/src/receiver/htraced.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c
index d92518d..3a2a091 100644
--- a/htrace-c/src/receiver/htraced.c
+++ b/htrace-c/src/receiver/htraced.c
@@ -71,7 +71,7 @@
  * The maximum length of the message we will send to the server.
  * This must be the same or shorter than MAX_HRPC_BODY_LENGTH in rpc.go.
  */
-#define MAX_HRPC_LEN (64ULL * 1024ULL * 1024ULL)
+#define MAX_HRPC_LEN (32ULL * 1024ULL * 1024ULL)
 
 /**
  * The maximum length of the prequel in a WriteSpans message.
@@ -490,8 +490,8 @@ static int should_xmit(struct htraced_rcv *rcv, uint64_t now)
 
 #define DEFAULT_TRID_STR        "DefaultTrid"
 #define DEFAULT_TRID_STR_LEN    (sizeof(DEFAULT_TRID_STR) - 1)
-#define SPANS_STR               "Spans"
-#define SPANS_STR_LEN           (sizeof(SPANS_STR) - 1)
+#define NUM_SPANS_STR               "NumSpans"
+#define NUM_SPANS_STR_LEN           (sizeof(NUM_SPANS_STR) - 1)
 
 /**
  * Write the prequel to the WriteSpans message.
@@ -511,10 +511,10 @@ static int add_writespans_prequel(struct htraced_rcv *rcv,
     if (!cmp_write_str(ctx, rcv->tracer->trid, strlen(rcv->tracer->trid))) {
         return -1;
     }
-    if (!cmp_write_fixstr(ctx, SPANS_STR, SPANS_STR_LEN)) {
+    if (!cmp_write_fixstr(ctx, NUM_SPANS_STR, NUM_SPANS_STR_LEN)) {
         return -1;
     }
-    if (!cmp_write_array(ctx, sbuf->num_spans)) {
+    if (!cmp_write_uint(ctx, sbuf->num_spans)) {
         return -1;
     }
     return bctx.off;

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/Godeps/Godeps.json
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/Godeps/Godeps.json b/htrace-htraced/go/Godeps/Godeps.json
index 7c737fe..2db37be 100644
--- a/htrace-htraced/go/Godeps/Godeps.json
+++ b/htrace-htraced/go/Godeps/Godeps.json
@@ -24,7 +24,7 @@
         },
         {
             "ImportPath": "github.com/ugorji/go/codec",
-            "Rev": "1a8bf87a90ddcdc7deaa0038f127ac62135fdd58"
+            "Rev": "ea9cd21fa0bc41ee4bdd50ac7ed8cbc7ea2ed960"
         }
     ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index 65b04e4..a2a6f8b 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -142,26 +142,36 @@ func (hcl *Client) FindSpan(sid common.SpanId) (*common.Span, error)
{
 	return &span, nil
 }
 
-func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
+func (hcl *Client) WriteSpans(spans []*common.Span) error {
 	if hcl.hrpcAddr == "" {
-		return hcl.writeSpansHttp(req)
+		return hcl.writeSpansHttp(spans)
 	}
 	hcr, err := newHClient(hcl.hrpcAddr, hcl.testHooks)
 	if err != nil {
 		return err
 	}
 	defer hcr.Close()
-	return hcr.writeSpans(req)
+	return hcr.writeSpans(spans)
 }
 
-func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
+func (hcl *Client) writeSpansHttp(spans []*common.Span) error {
+	req := common.WriteSpansReq {
+		NumSpans: len(spans),
+	}
 	var w bytes.Buffer
 	enc := json.NewEncoder(&w)
 	err := enc.Encode(req)
 	if err != nil {
-		return errors.New(fmt.Sprintf("Error serializing span: %s",
+		return errors.New(fmt.Sprintf("Error serializing WriteSpansReq: %s",
 			err.Error()))
 	}
+	for spanIdx := range(spans) {
+		err := enc.Encode(spans[spanIdx])
+		if err != nil {
+			return errors.New(fmt.Sprintf("Error serializing span %d out " +
+				"of %d: %s", spanIdx, len(spans), err.Error()))
+		}
+	}
 	_, _, err = hcl.makeRestRequest("POST", "writeSpans", &w)
 	if err != nil {
 		return err

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
index 2fcd9a0..43f0c6c 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
@@ -41,20 +41,41 @@ type HrpcClientCodec struct {
 	testHooks *TestHooks
 }
 
-func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) error {
-	methodId := common.HrpcMethodNameToId(req.ServiceMethod)
+func (cdc *HrpcClientCodec) WriteRequest(rr *rpc.Request, msg interface{}) error {
+	methodId := common.HrpcMethodNameToId(rr.ServiceMethod)
 	if methodId == common.METHOD_ID_NONE {
 		return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method name %s",
-			req.ServiceMethod))
+			rr.ServiceMethod))
 	}
 	mh := new(codec.MsgpackHandle)
 	mh.WriteExt = true
 	w := bytes.NewBuffer(make([]byte, 0, 2048))
+
+	var err error
 	enc := codec.NewEncoder(w, mh)
-	err := enc.Encode(msg)
-	if err != nil {
-		return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
-			"message as msgpack: %s", err.Error()))
+	if methodId == common.METHOD_ID_WRITE_SPANS {
+		spans := msg.([]*common.Span)
+		req := &common.WriteSpansReq {
+			NumSpans: len(spans),
+		}
+		err = enc.Encode(req)
+		if err != nil {
+			return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
+				"message as msgpack: %s", err.Error()))
+		}
+		for spanIdx := range(spans) {
+			err = enc.Encode(spans[spanIdx])
+			if err != nil {
+				return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
+					"span %d out of %d as msgpack: %s", spanIdx, len(spans), err.Error()))
+			}
+		}
+	} else {
+		err = enc.Encode(msg)
+		if err != nil {
+			return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to marshal "+
+				"message as msgpack: %s", err.Error()))
+		}
 	}
 	buf := w.Bytes()
 	if len(buf) > common.MAX_HRPC_BODY_LENGTH {
@@ -65,7 +86,7 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{})
erro
 	hdr := common.HrpcRequestHeader{
 		Magic:    common.HRPC_MAGIC,
 		MethodId: methodId,
-		Seq:      req.Seq,
+		Seq:      rr.Seq,
 		Length:   uint32(len(buf)),
 	}
 	err = binary.Write(cdc.rwc, binary.LittleEndian, &hdr)
@@ -154,9 +175,9 @@ func newHClient(hrpcAddr string, testHooks *TestHooks) (*hClient, error)
{
 	return &hcr, nil
 }
 
-func (hcr *hClient) writeSpans(req *common.WriteSpansReq) error {
+func (hcr *hClient) writeSpans(spans []*common.Span) error {
 	resp := common.WriteSpansResp{}
-	return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, req, &resp)
+	return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, spans, &resp)
 }
 
 func (hcr *hClient) Close() {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 2ec5fe9..5f02db6 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -34,13 +34,13 @@ const METHOD_NAME_WRITE_SPANS = "HrpcHandler.WriteSpans"
 const MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024
 
 // Maximum length of HRPC message body
-const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
+const MAX_HRPC_BODY_LENGTH = 32 * 1024 * 1024
 
 // A request to write spans to htraced.
+// This request is followed by a sequence of spans.
 type WriteSpansReq struct {
-	Addr        string `json:",omitempty"` // This gets filled in by the RPC layer.
 	DefaultTrid string `json:",omitempty"`
-	Spans       []*Span
+	NumSpans    int
 }
 
 // Info returned by /server/version

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index 3a877f6..7b64914 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -114,9 +114,7 @@ func TestClientOperations(t *testing.T) {
 	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
 
 	// Write half of the spans to htraced via the client.
-	err = hcl.WriteSpans(&common.WriteSpansReq{
-		Spans: allSpans[0 : NUM_TEST_SPANS/2],
-	})
+	err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2])
 	if err != nil {
 		t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
 			err.Error())
@@ -209,9 +207,7 @@ func TestDumpAll(t *testing.T) {
 	NUM_TEST_SPANS := 100
 	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
 	sort.Sort(allSpans)
-	err = hcl.WriteSpans(&common.WriteSpansReq{
-		Spans: allSpans,
-	})
+	err = hcl.WriteSpans(allSpans)
 	if err != nil {
 		t.Fatalf("WriteSpans failed: %s\n", err.Error())
 	}
@@ -325,9 +321,7 @@ func TestHrpcAdmissionsControl(t *testing.T) {
 	allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
 	for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
 		go func(i int) {
-			err = hcl.WriteSpans(&common.WriteSpansReq{
-				Spans: allSpans[i : i+1],
-			})
+			err = hcl.WriteSpans(allSpans[i : i+1])
 			if err != nil {
 				t.Fatalf("WriteSpans failed: %s\n", err.Error())
 			}
@@ -379,9 +373,7 @@ func TestHrpcIoTimeout(t *testing.T) {
 			// Keep in mind that we only block until we have seen
 			// TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that,
 			// we let requests through so that the test can exit cleanly.
-			hcl.WriteSpans(&common.WriteSpansReq{
-				Spans: allSpans[i : i+1],
-			})
+			hcl.WriteSpans(allSpans[i : i+1])
 		}(iter)
 	}
 	for {
@@ -398,6 +390,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B)
{
 	htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
 		Cnf: map[string]string{
 			conf.HTRACE_LOG_LEVEL: "INFO",
+			conf.HTRACE_NUM_HRPC_HANDLERS: "20",
 		},
 		WrittenSpans: common.NewSemaphore(int64(1 - N)),
 	}
@@ -416,7 +409,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B)
{
 	// body length limit.  TODO: a production-quality golang client would do
 	// this internally rather than needing us to do it here in the unit test.
 	bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
-	reqs := make([]*common.WriteSpansReq, 0, 4)
+	reqs := make([][]*common.Span, 0, 4)
 	curReq := -1
 	curReqLen := bodyLen
 	var curReqSpans uint32
@@ -429,7 +422,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B)
{
 		span := allSpans[n]
 		if (curReqSpans >= maxSpansPerRpc) ||
 			(curReqLen >= bodyLen) {
-			reqs = append(reqs, &common.WriteSpansReq{})
+			reqs = append(reqs, make([]*common.Span, 0, 16))
 			curReqLen = 0
 			curReq++
 			curReqSpans = 0
@@ -446,7 +439,7 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B)
{
 			panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen))
 		}
 		curReqLen += bufLen
-		reqs[curReq].Spans = append(reqs[curReq].Spans, span)
+		reqs[curReq] = append(reqs[curReq], span)
 		curReqSpans++
 	}
 	ht.Store.lg.Infof("num spans: %d.  num WriteSpansReq calls: %d\n", N, len(reqs))
@@ -465,13 +458,13 @@ func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B)
{
 
 	// Write many random spans.
 	for reqIdx := range reqs {
-		go func() {
-			err = hcl.WriteSpans(reqs[reqIdx])
+		go func(i int) {
+			err = hcl.WriteSpans(reqs[i])
 			if err != nil {
 				panic(fmt.Sprintf("failed to send WriteSpans request %d: %s",
-					reqIdx, err.Error()))
+					i, err.Error()))
 			}
-		}()
+		}(reqIdx)
 	}
 	// Wait for all the spans to be written.
 	ht.Store.WrittenSpans.Wait()

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 4fc400a..ebf3c47 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -478,9 +478,7 @@ func TestReloadDataStore(t *testing.T) {
 	// Create some random trace spans.
 	NUM_TEST_SPANS := 5
 	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-	err = hcl.WriteSpans(&common.WriteSpansReq{
-		Spans: allSpans,
-	})
+	err = hcl.WriteSpans(allSpans)
 	if err != nil {
 		t.Fatalf("WriteSpans failed: %s\n", err.Error())
 	}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index a6f6751..ecd13d4 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -32,7 +32,6 @@ import (
 	"net/rpc"
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
-	"reflect"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -101,6 +100,13 @@ type HrpcServerCodec struct {
 
 	// The number of messages this connection has handled.
 	numHandled int
+
+	// The buffer for reading requests.  These buffers are reused for multiple
+	// requests to avoid allocating memory.
+	buf []byte
+
+	// Configuration for msgpack decoding
+	msgpackHandle codec.MsgpackHandle
 }
 
 func asJson(val interface{}) string {
@@ -164,29 +170,55 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error
{
 }
 
 func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+	remoteAddr := cdc.conn.RemoteAddr().String()
 	if cdc.lg.TraceEnabled() {
 		cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
-			cdc.conn.RemoteAddr(), cdc.length)
+			remoteAddr, cdc.length)
+	}
+	if cap(cdc.buf) < int(cdc.length) {
+		var pow uint
+		for pow=0;(1<<pow) < int(cdc.length);pow++ {
+		}
+		cdc.buf = make([]byte, 0, 1<<pow)
 	}
-	mh := new(codec.MsgpackHandle)
-	mh.WriteExt = true
-	dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh)
-	err := dec.Decode(body)
+	_, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length])
 	if err != nil {
 		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+
 			"request body: %s", cdc.length, err.Error()))
 	}
+	var zeroTime time.Time
+	cdc.conn.SetDeadline(zeroTime)
+
+	dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle)
+	err = dec.Decode(body)
 	if cdc.lg.TraceEnabled() {
-		cdc.lg.Tracef("%s: read %d-byte request body %s\n",
-			cdc.conn.RemoteAddr(), cdc.length, asJson(&body))
+		cdc.lg.Tracef("%s: read HRPC message: %s\n",
+			remoteAddr, asJson(&body))
 	}
-	val := reflect.ValueOf(body)
-	addr := val.Elem().FieldByName("Addr")
-	if addr.IsValid() {
-		addr.SetString(cdc.conn.RemoteAddr().String())
+	req := body.(*common.WriteSpansReq)
+	if req == nil {
+		return nil
 	}
-	var zeroTime time.Time
-	cdc.conn.SetDeadline(zeroTime)
+	// We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage
+	// collector with a ton of trace spans all at once.
+	startTime := time.Now()
+	client, _, err := net.SplitHostPort(remoteAddr)
+	if err != nil {
+		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+
+			"for %s: %s\n", remoteAddr, err.Error()))
+	}
+	hand := cdc.hsv.hand
+	ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
+	for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ {
+		var span *common.Span
+		err := dec.Decode(&span)
+		if err != nil {
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d " +
+				"out of %d: %s\n", spanIdx, req.NumSpans, err.Error()))
+		}
+		ing.IngestSpan(span)
+	}
+	ing.Close(startTime)
 	return nil
 }
 
@@ -197,10 +229,8 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{})
e
 	var err error
 	buf := EMPTY
 	if msg != nil {
-		mh := new(codec.MsgpackHandle)
-		mh.WriteExt = true
 		w := bytes.NewBuffer(make([]byte, 0, 128))
-		enc := codec.NewEncoder(w, mh)
+		enc := codec.NewEncoder(w, &cdc.msgpackHandle)
 		err := enc.Encode(msg)
 		if err != nil {
 			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+
@@ -257,20 +287,8 @@ func (cdc *HrpcServerCodec) Close() error {
 }
 
 func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
-	resp *common.WriteSpansResp) (err error) {
-	startTime := time.Now()
-	hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s).  "+
-		"defaultTrid = %s\n", len(req.Spans), req.DefaultTrid)
-	client, _, err := net.SplitHostPort(req.Addr)
-	if err != nil {
-		return errors.New(fmt.Sprintf("Failed to split host and port "+
-			"for %s: %s\n", req.Addr, err.Error()))
-	}
-	ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
-	for spanIdx := range req.Spans {
-		ing.IngestSpan(req.Spans[spanIdx])
-	}
-	ing.Close(startTime)
+		resp *common.WriteSpansResp) (err error) {
+	// Nothing to do here; WriteSpans is handled in ReadRequestBody.
 	return nil
 }
 
@@ -303,6 +321,9 @@ func CreateHrpcServer(cnf *conf.Config, store *dataStore,
 		hsv.cdcs <- &HrpcServerCodec{
 			lg:  lg,
 			hsv: hsv,
+			msgpackHandle: codec.MsgpackHandle {
+				WriteExt: true,
+			},
 		}
 	}
 	var err error

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index bad7889..6daf640 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -118,9 +118,7 @@ func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) {
 
 	NUM_TEST_SPANS := 12
 	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-	err = hcl.WriteSpans(&common.WriteSpansReq{
-		Spans: allSpans,
-	})
+	err = hcl.WriteSpans(allSpans)
 	if err != nil {
 		t.Fatalf("WriteSpans failed: %s\n", err.Error())
 	}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index da82912..74ec0cf 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -24,8 +24,6 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/gorilla/mux"
-	"io"
-	"io/ioutil"
 	"net"
 	"net/http"
 	"org/apache/htrace/common"
@@ -230,34 +228,32 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req
*http.Reques
 				req.RemoteAddr, serr.Error()))
 		return
 	}
-	var dec *json.Decoder
-	if hand.lg.TraceEnabled() {
-		b, err := ioutil.ReadAll(req.Body)
-		if err != nil {
-			writeError(hand.lg, w, http.StatusBadRequest,
-				fmt.Sprintf("Error reading span data: %s", err.Error()))
-			return
-		}
-		hand.lg.Tracef("writeSpansHandler: read %s\n", string(b))
-		dec = json.NewDecoder(bytes.NewBuffer(b))
-	} else {
-		dec = json.NewDecoder(req.Body)
-	}
+	dec := json.NewDecoder(req.Body)
 	var msg common.WriteSpansReq
 	err := dec.Decode(&msg)
-	if (err != nil) && (err != io.EOF) {
+	if (err != nil) {
 		writeError(hand.lg, w, http.StatusBadRequest,
 			fmt.Sprintf("Error parsing WriteSpansReq: %s", err.Error()))
 		return
 	}
-	hand.lg.Debugf("writeSpansHandler: received %d span(s).  defaultTrid = %s\n",
-		len(msg.Spans), msg.DefaultTrid)
-
+	if hand.lg.TraceEnabled() {
+		hand.lg.Tracef("%s: read WriteSpans REST message: %s\n",
+			req.RemoteAddr, asJson(&msg))
+	}
 	ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid)
-	for spanIdx := range msg.Spans {
-		ing.IngestSpan(msg.Spans[spanIdx])
+	for spanIdx := 0; spanIdx < msg.NumSpans; spanIdx++ {
+		var span *common.Span
+		err := dec.Decode(&span)
+		if err != nil {
+			writeError(hand.lg, w, http.StatusBadRequest,
+				fmt.Sprintf("Failed to decode span %d out of %d: ",
+					spanIdx, msg.NumSpans, err.Error()))
+			return
+		}
+		ing.IngestSpan(span)
 	}
 	ing.Close(startTime)
+	return
 }
 
 type queryHandler struct {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 9837e94..2eff0a8 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -362,9 +362,7 @@ func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
 		}
 		fmt.Printf("\n")
 	}
-	err = hcl.WriteSpans(&common.WriteSpansReq{
-		Spans: spans,
-	})
+	err = hcl.WriteSpans(spans)
 	if err != nil {
 		fmt.Println(err.Error())
 		return EXIT_FAILURE

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
index 3206dd6..e5059f7 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
@@ -113,11 +113,11 @@ class Conf {
    */
   final static String BUFFER_SIZE_KEY =
       "htraced.receiver.buffer.size";
-  final static int BUFFER_SIZE_DEFAULT = 48 * 1024 * 1024;
+  final static int BUFFER_SIZE_DEFAULT = 16 * 1024 * 1024;
   static int BUFFER_SIZE_MIN = 4 * 1024 * 1024;
   // The maximum buffer size should not be longer than
   // PackedBuffer.MAX_HRPC_BODY_LENGTH.
-  final static int BUFFER_SIZE_MAX = 63 * 1024 * 1024;
+  final static int BUFFER_SIZE_MAX = 32 * 1024 * 1024;
 
   /**
    * Set the fraction of the span buffer which needs to fill up before we

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
index f867ad7..dd0a4b9 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
@@ -78,7 +78,7 @@ class PackedBuffer {
 
   private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
   private static final Charset UTF8 = StandardCharsets.UTF_8;
-  private static final byte SPANS[] = "Spans".getBytes(UTF8);
+  private static final byte NUM_SPANS[] = "NumSpans".getBytes(UTF8);
   private static final byte DEFAULT_PID[] = "DefaultPid".getBytes(UTF8);
   private static final byte A[] = "a".getBytes(UTF8);
   private static final byte B[] = "b".getBytes(UTF8);
@@ -401,9 +401,9 @@ class PackedBuffer {
         packer.writePayload(DEFAULT_PID);
         packer.packString(defaultPid);
       }
-      packer.packRawStringHeader(SPANS.length);
-      packer.writePayload(SPANS);
-      packer.packArrayHeader(numSpans);
+      packer.packRawStringHeader(NUM_SPANS.length);
+      packer.writePayload(NUM_SPANS);
+      packer.packInt(numSpans);
       packer.flush();
       success = true;
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/35053cfc/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
index 377d19f..39e5f99 100644
--- a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java
@@ -42,15 +42,12 @@ import org.eclipse.jetty.http.HttpStatus;
 class RestBufferManager implements BufferManager {
   private static final Log LOG = LogFactory.getLog(RestBufferManager.class);
   private static final Charset UTF8 = Charset.forName("UTF-8");
-  private static final byte COMMA_BYTE = (byte)0x2c;
   private static final int MAX_PREQUEL_LENGTH = 512;
-  private static final int MAX_EPILOGUE_LENGTH = 32;
   private final Conf conf;
   private final HttpClient httpClient;
   private final String urlString;
   private final ByteBuffer prequel;
   private final ByteBuffer spans;
-  private final ByteBuffer epilogue;
   private int numSpans;
 
   private static class RestBufferManagerContentProvider
@@ -122,7 +119,6 @@ class RestBufferManager implements BufferManager {
         conf.endpoint.getPort(), "/writeSpans").toString();
     this.prequel = ByteBuffer.allocate(MAX_PREQUEL_LENGTH);
     this.spans = ByteBuffer.allocate(conf.bufferSize);
-    this.epilogue = ByteBuffer.allocate(MAX_EPILOGUE_LENGTH);
     clear();
     this.httpClient.start();
   }
@@ -130,11 +126,10 @@ class RestBufferManager implements BufferManager {
   @Override
   public void writeSpan(Span span) throws IOException {
     byte[] spanJsonBytes = span.toString().getBytes(UTF8);
-    if ((spans.capacity() - spans.position()) < (spanJsonBytes.length + 1)) {
-      // Make sure we have enough space for the span JSON and a comma.
+    if ((spans.capacity() - spans.position()) < spanJsonBytes.length) {
+      // Make sure we have enough space for the span JSON.
       throw new IOException("Not enough space remaining in span buffer.");
     }
-    spans.put(COMMA_BYTE);
     spans.put(spanJsonBytes);
     numSpans++;
   }
@@ -151,16 +146,14 @@ class RestBufferManager implements BufferManager {
 
   @Override
   public void prepare() throws IOException {
-    String prequelString = "{\"Spans\":[";
+    StringBuilder bld = new StringBuilder();
+    bld.append("{\"NumSpans\":").append(numSpans).append("}");
+    String prequelString = bld.toString();
     prequel.put(prequelString.getBytes(UTF8));
     prequel.flip();
 
     spans.flip();
 
-    String epilogueString = "]}";
-    epilogue.put(epilogueString.toString().getBytes(UTF8));
-    epilogue.flip();
-
     if (LOG.isTraceEnabled()) {
       LOG.trace("Preparing to send " + contentLength() + " bytes of span " +
           "data to " + conf.endpointStr + ", containing " + numSpans +
@@ -172,12 +165,11 @@ class RestBufferManager implements BufferManager {
   public void flush() throws IOException {
     // Position the buffers at the beginning.
     prequel.position(0);
-    spans.position(spans.limit() == 0 ? 0 : 1); // Skip the first comma
-    epilogue.position(0);
+    spans.position(0);
 
     RestBufferManagerContentProvider contentProvider =
         new RestBufferManagerContentProvider(
-            new ByteBuffer[] { prequel, spans, epilogue });
+            new ByteBuffer[] { prequel, spans });
     long rpcLength = contentProvider.getLength();
     try {
       Request request = httpClient.
@@ -206,7 +198,6 @@ class RestBufferManager implements BufferManager {
   public void clear() {
     prequel.clear();
     spans.clear();
-    epilogue.clear();
     numSpans = 0;
   }
 


Mime
View raw message