qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [10/50] [abbrv] qpid-proton git commit: PROTON-1308: Go: Support Idle Timeout setting in electron Transport
Date Wed, 02 Nov 2016 02:53:30 GMT
PROTON-1308: Go: Support Idle Timeout setting in electron Transport

electron.Heartbeat(time.Duration) returns a ConnectionOption that will set the
required heartbeat interval for the remote peer. See electron.Heartbeat
and electron.ConnectionSettings.Heartbeat

NOTE: The term "Heartbeat" was chosen to avoid the ambiguous term
"idle-timeout". Heartbeat *always* refers to the max allowed duration
between *sending* frames. The frame receiver waits for 2*remote-heartbeat before
closing a connection.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ce80f9d7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ce80f9d7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ce80f9d7

Branch: refs/heads/go1
Commit: ce80f9d7250400cbe47bb5bf0ced3937ed829d92
Parents: e7b70d8
Author: Alan Conway <aconway@redhat.com>
Authored: Wed Sep 28 13:00:22 2016 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Wed Sep 28 16:39:05 2016 -0400

----------------------------------------------------------------------
 proton-c/bindings/go/genwrap.go                 |  1 +
 .../src/qpid.apache.org/electron/auth_test.go   |  6 +-
 .../src/qpid.apache.org/electron/connection.go  | 49 ++++++++++++----
 .../qpid.apache.org/electron/electron_test.go   | 61 ++++++++++++++++++++
 .../go/src/qpid.apache.org/electron/handler.go  |  2 +
 .../go/src/qpid.apache.org/proton/engine.go     | 38 +++++++++---
 .../go/src/qpid.apache.org/proton/wrappers.go   | 36 ++++++++----
 .../src/qpid.apache.org/proton/wrappers_gen.go  |  2 +-
 8 files changed, 160 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go
index f295a32..8a9af03 100644
--- a/proton-c/bindings/go/genwrap.go
+++ b/proton-c/bindings/go/genwrap.go
@@ -317,6 +317,7 @@ func mapType(ctype string) (g genType) {
 	case "C.pn_millis_t":
 		g.Gotype = "time.Duration"
 		g.ToGo = func(v string) string { return fmt.Sprintf("(time.Duration(%s) * time.Millisecond)",
v) }
+		g.ToC = func(v string) string { return fmt.Sprintf("C.pn_millis_t(%s/time.Millisecond)",
v) }
 	case "C.pn_timestamp_t":
 		g.Gotype = "time.Time"
 		g.ToC = func(v string) string { return fmt.Sprintf("pnTime(%s)", v) }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
index a090b78..73a9299 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
@@ -53,7 +53,7 @@ func TestAuthAnonymous(t *testing.T) {
 		[]ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
 		[]ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
 	fatalIf(t, err)
-	errorIf(t, checkEqual(connectionSettings{"anonymous", "vhost"}, got))
+	errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got))
 }
 
 func TestAuthPlain(t *testing.T) {
@@ -62,7 +62,7 @@ func TestAuthPlain(t *testing.T) {
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"),
Password([]byte("xxx"))},
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
 	fatalIf(t, err)
-	errorIf(t, checkEqual(connectionSettings{"fred@proton", ""}, got))
+	errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
 }
 
 func TestAuthBadPass(t *testing.T) {
@@ -118,7 +118,7 @@ func configureSASL() error {
 func TestMain(m *testing.M) {
 	status := m.Run()
 	if confDir != "" {
-		os.RemoveAll(confDir)
+		_ = os.RemoveAll(confDir)
 	}
 	os.Exit(status)
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 00c08ad..7f3050f 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -45,6 +45,11 @@ type ConnectionSettings interface {
 	//
 	// Returns error if the connection fails to authenticate.
 	VirtualHost() string
+
+	// Heartbeat is the maximum delay between sending frames that the remote peer
+	// has requested of us. If the interval expires an empty "heartbeat" frame
+	// will be sent automatically to keep the connection open.
+	Heartbeat() time.Duration
 }
 
 // Connection is an AMQP connection, created by a Container.
@@ -88,10 +93,12 @@ type Connection interface {
 
 type connectionSettings struct {
 	user, virtualHost string
+	heartbeat         time.Duration
 }
 
-func (c connectionSettings) User() string        { return c.user }
-func (c connectionSettings) VirtualHost() string { return c.virtualHost }
+func (c connectionSettings) User() string             { return c.user }
+func (c connectionSettings) VirtualHost() string      { return c.virtualHost }
+func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
 
 // ConnectionOption can be passed when creating a connection to configure various options
 type ConnectionOption func(*connection)
@@ -165,7 +172,7 @@ type connection struct {
 }
 
 // NewConnection creates a connection with the given options.
-func NewConnection(conn net.Conn, setting ...ConnectionOption) (*connection, error) {
+func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) {
 	c := &connection{
 		conn: conn,
 	}
@@ -176,7 +183,7 @@ func NewConnection(conn net.Conn, setting ...ConnectionOption) (*connection,
err
 		return nil, err
 	}
 	c.pConnection = c.engine.Connection()
-	for _, set := range setting {
+	for _, set := range opts {
 		set(c)
 	}
 	if c.container == nil {
@@ -211,7 +218,7 @@ func (c *connection) Disconnect(err error) {
 	c.engine.Disconnect(err)
 }
 
-func (c *connection) Session(setting ...SessionOption) (Session, error) {
+func (c *connection) Session(opts ...SessionOption) (Session, error) {
 	var s Session
 	err := c.engine.InjectWait(func() error {
 		if c.Error() != nil {
@@ -221,7 +228,7 @@ func (c *connection) Session(setting ...SessionOption) (Session, error)
{
 		if err == nil {
 			pSession.Open()
 			if err == nil {
-				s = newSession(c, pSession, setting...)
+				s = newSession(c, pSession, opts...)
 			}
 		}
 		return err
@@ -241,17 +248,17 @@ func (c *connection) DefaultSession() (s Session, err error) {
 	return c.defaultSession, err
 }
 
-func (c *connection) Sender(setting ...LinkOption) (Sender, error) {
+func (c *connection) Sender(opts ...LinkOption) (Sender, error) {
 	if s, err := c.DefaultSession(); err == nil {
-		return s.Sender(setting...)
+		return s.Sender(opts...)
 	} else {
 		return nil, err
 	}
 }
 
-func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) {
+func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) {
 	if s, err := c.DefaultSession(); err == nil {
-		return s.Receiver(setting...)
+		return s.Receiver(opts...)
 	} else {
 		return nil, err
 	}
@@ -288,11 +295,20 @@ func newIncomingConnection(c *connection) *IncomingConnection {
 		c:                  c}
 }
 
-func (in *IncomingConnection) Accept() Endpoint {
+// AcceptConnection is like Accept() but takes ConnectionOption s
+// For example you can set the Heartbeat() for the accepted connection.
+func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection {
 	return in.accept(func() Endpoint {
+		for _, opt := range opts {
+			opt(in.c)
+		}
 		in.c.pConnection.Open()
 		return in.c
-	})
+	}).(Connection)
+}
+
+func (in *IncomingConnection) Accept() Endpoint {
+	return in.AcceptConnection()
 }
 
 func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
@@ -325,6 +341,15 @@ func SASLAllowInsecure(b bool) ConnectionOption {
 	return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
 }
 
+// Heartbeat returns a ConnectionOption that requests the maximum delay
+// between sending frames for the remote peer. If we don't receive any frames
+// within 2*delay we will close the connection.
+//
+func Heartbeat(delay time.Duration) ConnectionOption {
+	// Proton-C divides the idle-timeout by 2 before sending, so compensate.
+	return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * delay) }
+}
+
 // GlobalSASLConfigDir sets the SASL configuration directory for every
 // Connection created in this process. If not called, the default is determined
 // by your SASL installation.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
index aa37d57..294e952 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
@@ -483,3 +483,64 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
 		}
 	}
 }
+
+func heartbeat(c Connection) time.Duration {
+	return c.(*connection).engine.Transport().RemoteIdleTimeout()
+}
+
+func TestHeartbeat(t *testing.T) {
+	client, server := newClientServerOpts(t,
+		[]ConnectionOption{Heartbeat(102 * time.Millisecond)},
+		nil)
+	defer closeClientServer(client, server)
+
+	var serverHeartbeat time.Duration
+
+	go func() {
+		for in := range server.Incoming() {
+			switch in := in.(type) {
+			case *IncomingConnection:
+				serverHeartbeat = in.Heartbeat()
+				in.AcceptConnection(Heartbeat(101 * time.Millisecond))
+			default:
+				in.Accept()
+			}
+		}
+	}()
+
+	// Freeze the server to stop it sending heartbeats.
+	unfreeze := make(chan bool)
+	defer close(unfreeze)
+	freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze
}) }
+
+	fatalIf(t, client.Sync())
+	errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection())))
+	errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat))
+	errorIf(t, client.Connection().Error())
+
+	// Freeze the server for less than a heartbeat
+	fatalIf(t, freeze())
+	time.Sleep(50 * time.Millisecond)
+	unfreeze <- true
+	// Make sure server is still responding.
+	s, err := client.Sender()
+	errorIf(t, err)
+	errorIf(t, s.Sync())
+
+	// Freeze the server till the client times out the connection
+	fatalIf(t, freeze())
+	select {
+	case <-client.Done():
+		if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name {
+			t.Error("bad timeout error:", client.Error())
+		}
+	case <-time.After(400 * time.Millisecond):
+		t.Error("connection failed to time out")
+	}
+
+	unfreeze <- true // Unfreeze the server
+	<-server.Done()
+	if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name {
+		t.Error("bad timeout error:", server.Error())
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 2a426aa..588ba79 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -79,6 +79,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		}
 
 	case proton.MConnectionOpening:
+		h.connection.heartbeat = e.Transport().RemoteIdleTimeout()
 		if e.Connection().State().LocalUninit() { // Remotely opened
 			h.incoming(newIncomingConnection(h.connection))
 		}
@@ -137,6 +138,7 @@ func (h *handler) incoming(in Incoming) {
 	var err error
 	if h.connection.incoming != nil {
 		h.connection.incoming <- in
+		// Must block until accept/reject, subsequent events may use the incoming endpoint.
 		err = in.wait()
 	} else {
 		err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 6439fc1..7ba6827 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -107,6 +107,7 @@ type Engine struct {
 	handlers  []EventHandler // Handlers for proton events.
 	running   chan struct{}  // This channel will be closed when the goroutines are done.
 	closeOnce sync.Once
+	timer     *time.Timer
 }
 
 const bufferSize = 4096
@@ -123,6 +124,7 @@ func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error)
{
 		conn:     conn,
 		handlers: handlers,
 		running:  make(chan struct{}),
+		timer:    time.NewTimer(0),
 	}
 	if pnErr := C.pn_connection_engine_init(&eng.engine); pnErr != 0 {
 		return nil, fmt.Errorf("cannot setup engine: %s", PnErrorCode(pnErr))
@@ -237,17 +239,34 @@ func (eng *Engine) Disconnect(err error) {
 	<-eng.running
 }
 
+// Let proton run timed activity and set up the next tick
+func (eng *Engine) tick() {
+	now := time.Now()
+	next := eng.Transport().Tick(now)
+	if !next.IsZero() {
+		eng.timer.Reset(next.Sub(now))
+	}
+}
+
 func (eng *Engine) dispatch() bool {
+	var needTick bool // Set if we need to tick the transport.
 	for {
 		if cevent := C.pn_connection_engine_dispatch(&eng.engine); cevent != nil {
 			event := makeEvent(cevent, eng)
 			for _, h := range eng.handlers {
+				switch event.Type() {
+				case ETransport:
+					needTick = true
+				}
 				h.HandleEvent(event)
 			}
 		} else {
 			break
 		}
 	}
+	if needTick {
+		eng.tick()
+	}
 	return !bool(C.pn_connection_engine_finished(&eng.engine))
 }
 
@@ -285,10 +304,10 @@ func (eng *Engine) Run() error {
 			if n > 0 {
 				readsOut <- rbuf[:n]
 			} else if err != nil {
-				eng.inject <- func() {
+				_ = eng.Inject(func() {
 					eng.Transport().Condition().SetError(err)
 					C.pn_connection_engine_read_close(&eng.engine)
-				}
+				})
 				return
 			}
 		}
@@ -305,10 +324,10 @@ func (eng *Engine) Run() error {
 			if n > 0 {
 				writesOut <- wbuf[:n]
 			} else if err != nil {
-				eng.inject <- func() {
+				_ = eng.Inject(func() {
 					eng.Transport().Condition().SetError(err)
 					C.pn_connection_engine_write_close(&eng.engine)
-				}
+				})
 				return
 			}
 		}
@@ -353,6 +372,9 @@ func (eng *Engine) Run() error {
 			if ok {
 				f()
 			}
+
+		case <-eng.timer.C:
+			eng.tick()
 		}
 	}
 
@@ -360,18 +382,16 @@ func (eng *Engine) Run() error {
 	eng.err.Set(eng.Transport().Condition().Error())
 	close(readsIn)
 	close(writesIn)
-	_ = eng.conn.Close() // Make sure connection is closed
+	close(eng.running)   // Signal goroutines have exited and Error is set, disable Inject()
+	_ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject)
 	wait.Wait()          // Wait for goroutines
 
-	close(eng.running) // Signal goroutines have exited and Error is set.
-
-	C.pn_connection_engine_final(&eng.engine)
-
 	for _, h := range eng.handlers {
 		switch h := h.(type) {
 		case cHandler:
 			C.pn_handler_free(h.pn)
 		}
 	}
+	C.pn_connection_engine_final(&eng.engine)
 	return eng.err.Get()
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index fa3e850..1dee743 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -291,6 +291,10 @@ func (s Session) Receiver(name string) Link {
 	return Link{C.pn_receiver(s.pn, cname)}
 }
 
+func (t Transport) String() string {
+	return fmt.Sprintf("(Transport)(%p)", t.CPtr())
+}
+
 // Unique (per process) string identifier for a connection, useful for debugging.
 func (c Connection) String() string {
 	// Use the transport address to match the default transport logs from PN_TRACE.
@@ -374,19 +378,31 @@ func (c Connection) Session() (Session, error) {
 }
 
 // pnTime converts Go time.Time to Proton millisecond Unix time.
-func pnTime(t time.Time) C.pn_timestamp_t {
-	secs := t.Unix()
-	// Note: sub-second accuracy is not guaraunteed if the Unix time in
-	// nanoseconds cannot be represented by an int64 (sometime around year 2260)
-	msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
-	return C.pn_timestamp_t(secs*1000 + msecs)
+//
+// Note: t.isZero() is converted to C.pn_timestamp_t(0) and vice-versa. These
+// are used as "not set" sentinel values by the Go and Proton APIs, so it is
+// better to conserve the "zeroness" even though they don't represent the same
+// time instant.
+//
+func pnTime(t time.Time) (pnt C.pn_timestamp_t) {
+	if !t.IsZero() {
+		pnt = C.pn_timestamp_t(t.Unix()*1000 + int64(t.Nanosecond())/int64(time.Millisecond))
+	}
+	return
 }
 
 // goTime converts a pn_timestamp_t to a Go time.Time.
-func goTime(t C.pn_timestamp_t) time.Time {
-	secs := int64(t) / 1000
-	nsecs := (int64(t) % 1000) * int64(time.Millisecond)
-	return time.Unix(secs, nsecs)
+//
+// Note: C.pn_timestamp_t(0) is converted to a zero time.Time and
+// vice-versa. These are used as "not set" sentinel values by the Go and Proton
+// APIs, so it is better to conserve the "zeroness" even though they don't
+// represent the same time instant.
+//
+func goTime(pnt C.pn_timestamp_t) (t time.Time) {
+	if pnt != 0 {
+		t = time.Unix(int64(pnt/1000), int64(pnt%1000)*int64(time.Millisecond))
+	}
+	return
 }
 
 // Special treatment for Transport.Head, return value is unsafe.Pointer not string

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
index 38c76cc..629caa6 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
@@ -812,7 +812,7 @@ func (t Transport) IdleTimeout() time.Duration {
 	return (time.Duration(C.pn_transport_get_idle_timeout(t.pn)) * time.Millisecond)
 }
 func (t Transport) SetIdleTimeout(timeout time.Duration) {
-	C.pn_transport_set_idle_timeout(t.pn, C.pn_millis_t(timeout))
+	C.pn_transport_set_idle_timeout(t.pn, C.pn_millis_t(timeout/time.Millisecond))
 }
 func (t Transport) RemoteIdleTimeout() time.Duration {
 	return (time.Duration(C.pn_transport_get_remote_idle_timeout(t.pn)) * time.Millisecond)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message