Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 27180200497 for ; Wed, 23 Aug 2017 19:45:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2562616949B; Wed, 23 Aug 2017 17:45:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7257216949C for ; Wed, 23 Aug 2017 19:45:56 +0200 (CEST) Received: (qmail 60143 invoked by uid 500); 23 Aug 2017 17:45:55 -0000 Mailing-List: contact commits-help@mynewt.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mynewt.apache.org Delivered-To: mailing list commits@mynewt.apache.org Received: (qmail 60088 invoked by uid 99); 23 Aug 2017 17:45:55 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Aug 2017 17:45:55 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id E920A815D4; Wed, 23 Aug 2017 17:45:52 +0000 (UTC) Date: Wed, 23 Aug 2017 17:45:57 +0000 To: "commits@mynewt.apache.org" Subject: [mynewt-newtmgr] 05/05: newtmgr - revendor MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: ccollins@apache.org Reply-To: "commits@mynewt.apache.org" In-Reply-To: <150351035235.26175.2801926179725853165@gitbox.apache.org> References: <150351035235.26175.2801926179725853165@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: mynewt-newtmgr X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: cbc994bce4c20908743fff54db94b8326b62339a X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20170823174552.E920A815D4@gitbox.apache.org> archived-at: Wed, 23 Aug 2017 17:45:58 -0000 This is an automated email from the ASF dual-hosted git repository. ccollins pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mynewt-newtmgr.git commit cbc994bce4c20908743fff54db94b8326b62339a Author: Christopher Collins AuthorDate: Fri Aug 18 13:55:40 2017 -0700 newtmgr - revendor --- newtmgr/Godeps/Godeps.json | 56 ++-- .../newtmgr/nmxact/nmble/ble_act.go | 29 +- .../newtmgr/nmxact/nmble/ble_advertiser.go | 4 +- .../newtmgr/nmxact/nmble/ble_scanner.go | 76 +++-- .../newtmgr/nmxact/nmble/ble_sesn.go | 25 +- .../newtmgr/nmxact/nmble/ble_util.go | 46 +++ .../newtmgr/nmxact/nmble/ble_xport.go | 371 +++++++++------------ .../mynewt.apache.org/newtmgr/nmxact/nmble/conn.go | 101 +++--- .../newtmgr/nmxact/nmble/discover.go | 2 +- .../newtmgr/nmxact/nmble/master.go | 135 ++++++++ .../mynewt.apache.org/newtmgr/nmxact/nmble/sync.go | 202 +++++++++++ .../newtmgr/nmxact/nmxutil/block.go | 44 ++- .../newtmgr/nmxact/nmxutil/sres.go | 17 +- .../mynewt.apache.org/newtmgr/nmxact/scan/scan.go | 6 + 14 files changed, 771 insertions(+), 343 deletions(-) diff --git a/newtmgr/Godeps/Godeps.json b/newtmgr/Godeps/Godeps.json index 2671c02..32bc34c 100644 --- a/newtmgr/Godeps/Godeps.json +++ b/newtmgr/Godeps/Godeps.json @@ -127,73 +127,73 @@ }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/adv", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/bledefs", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/mgmt", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmble", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmp", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmserial", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/nmxutil", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/oic", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/omp", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/scan", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/sesn", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/udp", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/xact", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" }, { "ImportPath": "mynewt.apache.org/newtmgr/nmxact/xport", - "Comment": "mynewt_1_1_0_tag-62-gff451f7", - "Rev": "ff451f7f35625b615e8d152dd55ad86c86e04423" + "Comment": "mynewt_1_1_0_tag-68-gd4d3606", + "Rev": "d4d360667b17c421d09f2447b1a4ec6d989d64ae" } ] } diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_act.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_act.go index 0d80957..7a8677a 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_act.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_act.go @@ -753,7 +753,9 @@ func securityInitiate(x *BleXport, bl *Listener, } // Blocking -func advStart(x *BleXport, bl *Listener, r *BleAdvStartReq) (uint16, error) { +func advStart(x *BleXport, bl *Listener, stopChan chan struct{}, + r *BleAdvStartReq) (uint16, error) { + const rspType = MSG_TYPE_ADV_START j, err := json.Marshal(r) @@ -794,6 +796,9 @@ func advStart(x *BleXport, bl *Listener, r *BleAdvStartReq) (uint16, error) { case <-bl.AfterTimeout(x.RspTimeout()): return 0, BhdTimeoutError(rspType, r.Seq) + + case <-stopChan: + return 0, fmt.Errorf("advertise aborted") } } } @@ -1185,3 +1190,25 @@ func setPreferredMtu(x *BleXport, bl *Listener, } } } + +func checkSync(x *BleXport, bl *Listener, r *BleSyncReq) (bool, error) { + j, err := json.Marshal(r) + if err != nil { + return false, err + } + + if err := x.txNoSync(j); err != nil { + return false, err + } + for { + select { + case err := <-bl.ErrChan: + return false, err + case bm := <-bl.MsgChan: + switch msg := bm.(type) { + case *BleSyncRsp: + return msg.Synced, nil + } + } + } +} diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_advertiser.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_advertiser.go index 90c85e0..67fde5e 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_advertiser.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_advertiser.go @@ -88,7 +88,7 @@ func (a *Advertiser) advertise(cfg adv.Cfg) (uint16, *Listener, error) { return 0, nil, err } - connHandle, err := advStart(a.bx, bl, r) + connHandle, err := advStart(a.bx, bl, a.stopChan, r) if err != nil { a.bx.RemoveListener(bl) if !nmxutil.IsXport(err) { @@ -122,7 +122,7 @@ func (a *Advertiser) stopAdvertising() error { func (a *Advertiser) buildSesn(cfg adv.Cfg, connHandle uint16, bl *Listener) ( sesn.Sesn, error) { - s, err := NewBleSesn(a.bx, cfg.Ble.SesnCfg) + s, err := NewBleSesn(a.bx, cfg.Ble.SesnCfg, MASTER_PRIO_CONNECT) if err != nil { return nil, err } diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go index b93a5e9..3d2a05e 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_scanner.go @@ -33,15 +33,18 @@ import ( "mynewt.apache.org/newtmgr/nmxact/xact" ) +const scanRetryRate = time.Second + // Implements scan.Scanner. type BleScanner struct { cfg scan.Cfg - bx *BleXport - discoverer *Discoverer - reportedDevs map[BleDev]string - bos *BleSesn - enabled bool + bx *BleXport + discoverer *Discoverer + reportedDevs map[BleDev]string + bos *BleSesn + enabled bool + suspendBlocker nmxutil.Blocker // Protects accesses to the reported devices map. mtx sync.Mutex @@ -64,11 +67,7 @@ func (s *BleScanner) discover() (*BleDev, error) { }) s.mtx.Unlock() - defer func() { - s.mtx.Lock() - defer s.mtx.Unlock() - s.discoverer = nil - }() + defer func() { s.discoverer = nil }() var dev *BleDev advRptCb := func(r BleAdvReport) { @@ -90,13 +89,13 @@ func (s *BleScanner) discover() (*BleDev, error) { func (s *BleScanner) connect(dev BleDev) error { s.cfg.SesnCfg.PeerSpec.Ble = dev - session, err := s.bx.BuildSesn(s.cfg.SesnCfg) + bs, err := NewBleSesn(s.bx, s.cfg.SesnCfg, MASTER_PRIO_SCAN) if err != nil { return err } s.mtx.Lock() - s.bos = session.(*BleSesn) + s.bos = bs s.mtx.Unlock() if err := s.bos.Open(); err != nil { @@ -116,7 +115,7 @@ func (s *BleScanner) readHwId() (string, error) { } if res.Status() != 0 { return "", - fmt.Errorf("failed to read hardware ID; NMP status=%discoverer", + fmt.Errorf("failed to read hardware ID; NMP status=%d", res.Status()) } cres := res.(*xact.ConfigReadResult) @@ -124,6 +123,10 @@ func (s *BleScanner) readHwId() (string, error) { } func (s *BleScanner) scan() (*scan.ScanPeer, error) { + // Ensure subsequent calls to suspend() block until scanning has stopped. + s.suspendBlocker.Start() + defer s.suspendBlocker.Unblock(nil) + // Discover the first device which matches the specified predicate. dev, err := s.discover() if err != nil { @@ -192,10 +195,7 @@ func (s *BleScanner) Start(cfg scan.Cfg) error { p, err := s.scan() if err != nil { log.Debugf("Scan error: %s", err.Error()) - if nmxutil.IsXport(err) { - // Transport stopped; abort the scan. - s.enabled = false - } + time.Sleep(scanRetryRate) } else if p != nil { s.mtx.Lock() s.reportedDevs[p.PeerSpec.Ble] = p.HwId @@ -209,31 +209,51 @@ func (s *BleScanner) Start(cfg scan.Cfg) error { return nil } -func (s *BleScanner) Stop() error { - if !s.enabled { - return nmxutil.NewAlreadyError("Attempt to stop BLE scanner twice") - } - s.enabled = false - - s.mtx.Lock() - defer s.mtx.Unlock() - +func (s *BleScanner) suspend() error { discoverer := s.discoverer bos := s.bos if discoverer != nil { discoverer.Stop() - s.discoverer = nil } if bos != nil { bos.Close() - s.bos = nil } + // Block until scan is fully terminated. + s.suspendBlocker.Wait(nmxutil.DURATION_FOREVER, nil) + + s.discoverer = nil + s.bos = nil + return nil } +// Aborts the current scan but leaves the scanner enabled. This function is +// called when a higher priority procedure (e.g., connecting) needs to acquire +// master privileges. When the high priority procedures are complete, scanning +// will resume. +func (s *BleScanner) Preempt() error { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.suspend() +} + +// Stops the scanner. Scanning won't resume unless Start() gets called. +func (s *BleScanner) Stop() error { + s.mtx.Lock() + defer s.mtx.Unlock() + + if !s.enabled { + return nmxutil.NewAlreadyError("Attempt to stop BLE scanner twice") + } + s.enabled = false + + return s.suspend() +} + // @return true if the specified device was found and // forgetten; // false if the specified device is unknown. diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_sesn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_sesn.go index 3138f39..763997d 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_sesn.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_sesn.go @@ -37,6 +37,7 @@ import ( type BleSesn struct { cfg sesn.SesnCfg bx *BleXport + prio MasterPrio conn *Conn mgmtChrs BleMgmtChrs txvr *mgmt.Transceiver @@ -47,7 +48,7 @@ type BleSesn struct { } func (s *BleSesn) init() error { - s.conn = NewConn(s.bx) + s.conn = NewConn(s.bx, s.prio) s.stopChan = make(chan struct{}) if s.txvr != nil { @@ -63,7 +64,9 @@ func (s *BleSesn) init() error { return nil } -func NewBleSesn(bx *BleXport, cfg sesn.SesnCfg) (*BleSesn, error) { +func NewBleSesn(bx *BleXport, cfg sesn.SesnCfg, prio MasterPrio) ( + *BleSesn, error) { + mgmtChrs, err := BuildMgmtChrs(cfg.MgmtProto) if err != nil { return nil, err @@ -72,6 +75,7 @@ func NewBleSesn(bx *BleXport, cfg sesn.SesnCfg) (*BleSesn, error) { s := &BleSesn{ cfg: cfg, bx: bx, + prio: prio, mgmtChrs: mgmtChrs, } @@ -85,13 +89,10 @@ func (s *BleSesn) AbortRx(seq uint8) error { return nil } +// Listens for disconnect in the background. func (s *BleSesn) disconnectListen() { - // Listen for disconnect in the background. s.wg.Add(1) go func() { - // If the session is being closed, unblock the close() call. - defer s.closeBlocker.Unblock(nil) - // Block until disconnect. err := <-s.conn.DisconnectChan() nmxutil.Assert(!s.IsOpen()) @@ -110,6 +111,9 @@ func (s *BleSesn) disconnectListen() { if s.cfg.OnCloseCb != nil { s.cfg.OnCloseCb(s, err) } + + // Finally, unblock the close() call, if any. + s.closeBlocker.Unblock(nil) }() } @@ -195,7 +199,7 @@ func (s *BleSesn) openOnce() (bool, error) { } // Ensure subsequent calls to Close() block. - s.closeBlocker.Block() + s.closeBlocker.Start() // Listen for disconnect in the background. s.disconnectListen() @@ -266,13 +270,14 @@ func (s *BleSesn) OpenConnected( if err := s.conn.Inherit(connHandle, eventListener); err != nil { if !nmxutil.IsSesnAlreadyOpen(err) { - s.closeBlocker.Unblock(nil) + //s.closeBlocker.Unblock(nil) + s.Close() } return err } // Ensure subsequent calls to Close() block. - s.closeBlocker.Block() + s.closeBlocker.Start() // Listen for disconnect in the background. s.disconnectListen() @@ -291,7 +296,7 @@ func (s *BleSesn) Close() error { } // Block until close completes. - s.closeBlocker.Wait(nmxutil.DURATION_FOREVER) + s.closeBlocker.Wait(nmxutil.DURATION_FOREVER, nil) return nil } diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go index 7bb1e6f..6c814de 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_util.go @@ -382,6 +382,14 @@ func NewFindChrReq() *BleFindChrReq { } } +func NewSyncReq() *BleSyncReq { + return &BleSyncReq{ + Op: MSG_OP_REQ, + Type: MSG_TYPE_SYNC, + Seq: NextSeq(), + } +} + func ConnFindXact(x *BleXport, connHandle uint16) (BleConnDesc, error) { r := NewBleConnFindReq() r.ConnHandle = connHandle @@ -797,3 +805,41 @@ func BuildMgmtChrs(mgmtProto sesn.MgmtProto) (BleMgmtChrs, error) { return mgmtChrs, nil } + +type MasterPrio int + +const ( + // Lower number = higher priority. + MASTER_PRIO_CONNECT = 0 + MASTER_PRIO_SCAN = 1 +) + +func AcquireMaster(bx *BleXport, prio MasterPrio, token interface{}) error { + switch prio { + case MASTER_PRIO_CONNECT: + return bx.AcquireMasterConnect(token) + + case MASTER_PRIO_SCAN: + return bx.AcquireMasterScan(token) + + default: + return fmt.Errorf("Invalid session priority: %+v", prio) + } +} + +func StopWaitingForMaster(bx *BleXport, prio MasterPrio, token interface{}, + err error) error { + + switch prio { + case MASTER_PRIO_CONNECT: + bx.StopWaitingForMasterConnect(token, err) + return nil + + case MASTER_PRIO_SCAN: + bx.StopWaitingForMasterScan(token, err) + return nil + + default: + return fmt.Errorf("Invalid session priority: %+v", prio) + } +} diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go index 03b158e..9261d21 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/ble_xport.go @@ -21,7 +21,6 @@ package nmble import ( "encoding/hex" - "encoding/json" "fmt" "sync" "time" @@ -88,7 +87,7 @@ func NewXportCfg() XportCfg { BlehostdRspTimeout: 10 * time.Second, Restart: true, SyncTimeout: 10 * time.Second, - PreferredMtu: 264, + PreferredMtu: 512, } } @@ -104,34 +103,38 @@ const ( // Implements xport.Xport. type BleXport struct { - cfg XportCfg - d *Dispatcher - client *unixchild.Client - state BleXportState - stopChan chan struct{} - shutdownChan chan bool - readyBcast nmxutil.Bcaster - master nmxutil.SingleResource - slave nmxutil.SingleResource - randAddr *BleAddr - mtx sync.Mutex - scanner *BleScanner - advertiser *Advertiser - cm ChrMgr - sesns map[uint16]*BleSesn + advertiser *Advertiser + cfg XportCfg + client *unixchild.Client + cm ChrMgr + d *Dispatcher + master Master + mtx sync.Mutex + randAddr *BleAddr + readyBcast nmxutil.Bcaster + scanner *BleScanner + sesns map[uint16]*BleSesn + shutdownBlocker nmxutil.Blocker + slave nmxutil.SingleResource + state BleXportState + stopChan chan struct{} + syncer Syncer + wg sync.WaitGroup } func NewBleXport(cfg XportCfg) (*BleXport, error) { bx := &BleXport{ - cfg: cfg, - d: NewDispatcher(), - shutdownChan: make(chan bool), - readyBcast: nmxutil.Bcaster{}, - master: nmxutil.NewSingleResource(), - slave: nmxutil.NewSingleResource(), - sesns: map[uint16]*BleSesn{}, + cfg: cfg, + d: NewDispatcher(), + readyBcast: nmxutil.Bcaster{}, + slave: nmxutil.NewSingleResource(), + sesns: map[uint16]*BleSesn{}, } + bx.advertiser = NewAdvertiser(bx) + bx.scanner = NewBleScanner(bx) + bx.master = NewMaster(bx, bx.scanner) + return bx, nil } @@ -166,37 +169,17 @@ func (bx *BleXport) BuildScanner() (scan.Scanner, error) { // The transport only allows a single scanner. This is because the // master privileges need to managed among the scanner and the // sessions. - if bx.scanner == nil { - bx.scanner = NewBleScanner(bx) - } - return bx.scanner, nil } func (bx *BleXport) BuildAdvertiser() (adv.Advertiser, error) { // The transport only allows a single advertiser. This is because the // slave privileges need to managed among all the advertise operations. - if bx.advertiser == nil { - bx.advertiser = NewAdvertiser(bx) - } - return bx.advertiser, nil } func (bx *BleXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) { - return NewBleSesn(bx, cfg) -} - -func (bx *BleXport) addSyncListener() (*Listener, error) { - key := TchKey(MSG_TYPE_SYNC_EVT, -1) - nmxutil.LogAddListener(3, key, 0, "sync") - return bx.AddListener(key) -} - -func (bx *BleXport) addResetListener() (*Listener, error) { - key := TchKey(MSG_TYPE_RESET_EVT, -1) - nmxutil.LogAddListener(3, key, 0, "reset") - return bx.AddListener(key) + return NewBleSesn(bx, cfg, MASTER_PRIO_CONNECT) } func (bx *BleXport) addAccessListener() (*Listener, error) { @@ -205,121 +188,83 @@ func (bx *BleXport) addAccessListener() (*Listener, error) { return bx.AddListener(key) } -func (bx *BleXport) querySyncStatus() (bool, error) { - req := &BleSyncReq{ - Op: MSG_OP_REQ, - Type: MSG_TYPE_SYNC, - Seq: NextSeq(), - } +func (bx *BleXport) shutdown(restart bool, err error) { + nmxutil.Assert(nmxutil.IsXport(err)) - j, err := json.Marshal(req) - if err != nil { - return false, err - } + // Prevents repeated shutdowns without keeping the mutex locked throughout + // the duration of the shutdown. + // + // @return bool true if a shutdown was successfully initiated. + initiate := func() bool { + bx.mtx.Lock() + defer bx.mtx.Unlock() - key := SeqKey(req.Seq) - bl, err := bx.AddListener(key) - if err != nil { - return false, err - } - defer bx.RemoveListener(bl) + if bx.state == BLE_XPORT_STATE_STARTED || + bx.state == BLE_XPORT_STATE_STARTING { - if err := bx.txNoSync(j); err != nil { - return false, err - } - for { - select { - case err := <-bl.ErrChan: - return false, err - case bm := <-bl.MsgChan: - switch msg := bm.(type) { - case *BleSyncRsp: - return msg.Synced, nil - } + bx.state = BLE_XPORT_STATE_STOPPING + return true + } else { + return false } } -} - -func (bx *BleXport) initialSyncCheck() (bool, *Listener, error) { - bl, err := bx.addSyncListener() - if err != nil { - return false, nil, err - } - - synced, err := bx.querySyncStatus() - if err != nil { - bx.RemoveListener(bl) - return false, nil, err - } - - return synced, bl, nil -} - -func (bx *BleXport) shutdown(restart bool, err error) { - nmxutil.Assert(nmxutil.IsXport(err)) - log.Debugf("Shutting down BLE transport") - - bx.mtx.Lock() - - var fullyStarted bool - var already bool + go func() { + log.Debugf("Shutting down BLE transport") - switch bx.state { - case BLE_XPORT_STATE_STARTED: - already = false - fullyStarted = true - case BLE_XPORT_STATE_STARTING: - already = false - fullyStarted = false - default: - already = true - } + success := initiate() + if !success { + // Shutdown already in progress. + return + } - if !already { - bx.state = BLE_XPORT_STATE_STOPPING - } + bx.sesns = map[uint16]*BleSesn{} - bx.mtx.Unlock() + // Indicate error to all clients who are waiting for the master + // resource. + log.Debugf("Aborting BLE master") + bx.master.Abort(err) - if already { - // Shutdown already in progress. - return - } + // Indicate an error to all of this transport's listeners. This + // prevents them from blocking endlessly while awaiting a BLE message. + log.Debugf("Stopping BLE dispatcher") + bx.d.ErrorAll(err) - bx.sesns = map[uint16]*BleSesn{} + synced, err := bx.syncer.Refresh() + if err == nil && synced { + // Reset controller so that all outstanding connections terminate. + ResetXact(bx) + } - // Indicate error to all clients who are waiting for the master resource. - log.Debugf("Aborting BLE master") - bx.master.Abort(err) + bx.syncer.Stop() - // Indicate an error to all of this transport's listeners. This prevents - // them from blocking endlessly while awaiting a BLE message. - log.Debugf("Stopping BLE dispatcher") - bx.d.ErrorAll(err) + // Stop all of this transport's go routines. + close(bx.stopChan) + bx.wg.Wait() - synced, err := bx.querySyncStatus() - if err == nil && synced { - // Reset controller so that all outstanding connections terminate. - ResetXact(bx) - } + // Stop the unixchild instance (blehostd + socket). + if bx.client != nil { + log.Debugf("Stopping unixchild") + bx.client.Stop() + } - // Stop all of this transport's go routines. - close(bx.stopChan) + bx.setStateFrom(BLE_XPORT_STATE_STOPPING, BLE_XPORT_STATE_STOPPED) - // Stop the unixchild instance (blehostd + socket). - if bx.client != nil { - log.Debugf("Stopping unixchild") - bx.client.Stop() - } + // Indicate that the shutdown is complete. If restarts are enabled on + // this transport, this signals that the transport should be started + // again. + bx.shutdownBlocker.UnblockAndRestart(restart) + }() +} - bx.setStateFrom(BLE_XPORT_STATE_STOPPING, BLE_XPORT_STATE_STOPPED) +func (bx *BleXport) waitForShutdown() bool { + itf, _ := bx.shutdownBlocker.Wait(nmxutil.DURATION_FOREVER, nil) + return itf.(bool) +} - // Indicate that the shutdown is complete. If restarts are enabled on this - // transport, this signals that the transport should be started again. - if fullyStarted { - bx.shutdownChan <- restart - } +func (bx *BleXport) blockingShutdown(restart bool, err error) { + bx.shutdown(restart, err) + bx.waitForShutdown() } func (bx *BleXport) blockUntilReady() error { @@ -379,7 +324,7 @@ func (bx *BleXport) setStateFrom(from BleXportState, to BleXportState) bool { } func (bx *BleXport) Stop() error { - bx.shutdown(false, nmxutil.NewXportError("xport stopped")) + bx.blockingShutdown(false, nmxutil.NewXportError("xport stopped")) return nil } @@ -391,12 +336,15 @@ func (bx *BleXport) startOnce() error { bx.stopChan = make(chan struct{}) if err := bx.startUnixChild(); err != nil { - bx.shutdown(true, err) + bx.blockingShutdown(true, err) return err } // Listen for errors and data from the blehostd process. + bx.wg.Add(1) go func() { + defer bx.wg.Done() + for { select { case err := <-bx.client.ErrChild: @@ -416,49 +364,28 @@ func (bx *BleXport) startOnce() error { } }() - synced, syncl, err := bx.initialSyncCheck() - if err != nil { - bx.shutdown(true, err) + if err := bx.syncer.Start(bx); err != nil { + bx.blockingShutdown(true, err) return err } // Block until host and controller are synced. - if !synced { - SyncLoop: - for { - select { - case err := <-syncl.ErrChan: - bx.shutdown(true, err) - return err - case bm := <-syncl.MsgChan: - switch msg := bm.(type) { - case *BleSyncEvt: - if msg.Synced { - break SyncLoop - } - } - case <-time.After(bx.cfg.SyncTimeout): - err := nmxutil.NewXportError( - "Timeout waiting for host <-> controller sync") - bx.shutdown(true, err) - return err - case <-bx.stopChan: - return nmxutil.NewXportError("Transport startup aborted") - } - } + if err := bx.syncer.BlockUntilSynced( + bx.cfg.SyncTimeout, bx.stopChan); err != nil { + + err = nmxutil.NewXportError( + "Error waiting for host <-> controller sync: " + err.Error()) + bx.blockingShutdown(true, err) + return err } // Host and controller are synced. Listen for events in the background: // * sync loss // * stack reset // * GATT access + bx.wg.Add(1) go func() { - resetl, err := bx.addResetListener() - if err != nil { - bx.shutdown(true, err) - return - } - defer bx.RemoveListener(resetl) + defer bx.wg.Done() accessl, err := bx.addAccessListener() if err != nil { @@ -469,24 +396,8 @@ func (bx *BleXport) startOnce() error { for { select { - case err := <-syncl.ErrChan: - bx.shutdown(true, err) - return - case bm := <-syncl.MsgChan: - switch msg := bm.(type) { - case *BleSyncEvt: - if !msg.Synced { - bx.shutdown(true, nmxutil.NewXportError( - "BLE host <-> controller sync lost")) - } - } - - case err := <-resetl.ErrChan: - bx.shutdown(true, err) - return - case bm := <-resetl.MsgChan: - switch msg := bm.(type) { - case *BleResetEvt: + case reasonItf, ok := <-bx.syncer.ListenReset(): + if ok { // Only process the reset event if the transport is not // already shutting down. If in mid-shutdown, the reset // event was likely elicited by the shutdown itself. @@ -494,23 +405,36 @@ func (bx *BleXport) startOnce() error { if state == BLE_XPORT_STATE_STARTING || state == BLE_XPORT_STATE_STARTED { + reason := reasonItf.(int) bx.shutdown(true, nmxutil.NewXportError(fmt.Sprintf( "The BLE controller has been reset by the host; "+ "reason=%s (%d)", - ErrCodeToString(msg.Reason), msg.Reason))) - return + ErrCodeToString(reason), reason))) } } - case err := <-accessl.ErrChan: - bx.shutdown(true, err) - return - case bm := <-accessl.MsgChan: - switch msg := bm.(type) { - case *BleAccessEvt: - if err := bx.cm.Access(bx, msg); err != nil { - log.Debugf("Error sending access status: %s", - err.Error()) + case syncedItf, ok := <-bx.syncer.ListenSync(): + if ok { + synced := syncedItf.(bool) + if !synced { + bx.shutdown(true, nmxutil.NewXportError( + "BLE host <-> controller sync lost")) + } + } + + case err, ok := <-accessl.ErrChan: + if ok { + bx.shutdown(true, err) + } + + case bm, ok := <-accessl.MsgChan: + if ok { + switch msg := bm.(type) { + case *BleAccessEvt: + if err := bx.cm.Access(bx, msg); err != nil { + log.Debugf("Error sending access status: %s", + err.Error()) + } } } @@ -524,7 +448,7 @@ func (bx *BleXport) startOnce() error { if bx.randAddr == nil { addr, err := GenRandAddrXact(bx) if err != nil { - bx.shutdown(true, err) + bx.blockingShutdown(true, err) return err } @@ -533,20 +457,19 @@ func (bx *BleXport) startOnce() error { // Set the random address on the controller. if err := SetRandAddrXact(bx, *bx.randAddr); err != nil { - bx.shutdown(true, err) + bx.blockingShutdown(true, err) return err } // Set the preferred ATT MTU in the host. if err := SetPreferredMtuXact(bx, bx.cfg.PreferredMtu); err != nil { - bx.shutdown(true, err) + bx.blockingShutdown(true, err) return err } if !bx.setStateFrom(BLE_XPORT_STATE_STARTING, BLE_XPORT_STATE_STARTED) { - bx.shutdown(true, err) - return nmxutil.NewXportError( - "Internal error; BLE transport in unexpected state") + bx.blockingShutdown(true, nmxutil.NewXportError( + "Internal error; BLE transport in unexpected state")) } return nil @@ -557,6 +480,8 @@ func (bx *BleXport) Start() error { return nmxutil.NewXportError("BLE xport started twice") } + bx.shutdownBlocker.Start() + // Try to start the transport. If this first attempt fails, report the // error and don't retry. if err := bx.startOnce(); err != nil { @@ -567,10 +492,12 @@ func (bx *BleXport) Start() error { } // Now that the first start attempt has succeeded, start a restart loop in - // the background. + // the background. This Go routine does not participate in the wait group + // because it terminates itself independent of the others. go func() { // Block until transport shuts down. - restart := <-bx.shutdownChan + restart := bx.waitForShutdown() + for { // If restarts are disabled, or if the shutdown was a result of an // explicit stop call (instead of an unexpected error), stop @@ -592,7 +519,7 @@ func (bx *BleXport) Start() error { err.Error()) } else { // Success. Block until the transport shuts down. - restart = <-bx.shutdownChan + restart = bx.waitForShutdown() } } }() @@ -640,16 +567,24 @@ func (bx *BleXport) RspTimeout() time.Duration { return bx.cfg.BlehostdRspTimeout } -func (bx *BleXport) AcquireMaster(token interface{}) error { - return bx.master.Acquire(token) +func (bx *BleXport) AcquireMasterConnect(token interface{}) error { + return bx.master.AcquireConnect(token) +} + +func (bx *BleXport) AcquireMasterScan(token interface{}) error { + return bx.master.AcquireScan(token) } func (bx *BleXport) ReleaseMaster() { bx.master.Release() } -func (bx *BleXport) StopWaitingForMaster(token interface{}, err error) { - bx.master.StopWaiting(token, err) +func (bx *BleXport) StopWaitingForMasterConnect(token interface{}, err error) { + bx.master.StopWaitingConnect(token, err) +} + +func (bx *BleXport) StopWaitingForMasterScan(token interface{}, err error) { + bx.master.StopWaitingScan(token, err) } func (bx *BleXport) AcquireSlave(token interface{}) error { diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/conn.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/conn.go index 40acaa0..c8a98bf 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/conn.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/conn.go @@ -31,6 +31,7 @@ func NewNotifyListener() *NotifyListener { type Conn struct { bx *BleXport + prio MasterPrio rxvr *Receiver attMtu uint16 profile Profile @@ -38,13 +39,13 @@ type Conn struct { connHandle uint16 connecting bool - stopped bool disconnectChan chan error wg sync.WaitGroup encBlocker nmxutil.Blocker - // Terminates all go routines. Gets set to null after disconnect. + // Terminates all go routines. stopChan chan struct{} + stopped bool notifyMap map[*Characteristic]*NotifyListener @@ -56,9 +57,10 @@ type Conn struct { mtx sync.Mutex } -func NewConn(bx *BleXport) *Conn { +func NewConn(bx *BleXport, prio MasterPrio) *Conn { return &Conn{ bx: bx, + prio: prio, rxvr: NewReceiver(nmxutil.GetNextId(), bx, 1), connHandle: BLE_CONN_HANDLE_NONE, attMtu: BLE_ATT_MTU_DFLT, @@ -72,19 +74,6 @@ func (c *Conn) DisconnectChan() <-chan error { return c.disconnectChan } -func (c *Conn) initiateShutdown() bool { - c.mtx.Lock() - defer c.mtx.Unlock() - - if c.stopped { - return false - } - c.stopped = true - - close(c.stopChan) - return true -} - func (c *Conn) abortNotifyListeners(err error) { // No need to lock mutex; this should only be called after all go routines // have terminated. @@ -95,25 +84,47 @@ func (c *Conn) abortNotifyListeners(err error) { } } -func (c *Conn) shutdown(err error) { - if !c.initiateShutdown() { - return +func (c *Conn) shutdown(delay time.Duration, err error) { + // Returns true if a shutdown was successfully initiated. Prevents + // repeated shutdowns without keeping the mutex locked throughout the + // duration of the shutdown. + initiate := func() bool { + c.mtx.Lock() + defer c.mtx.Unlock() + + if c.stopped { + return false + } + c.stopped = true + + close(c.stopChan) + return true } - c.connecting = false - c.connHandle = BLE_CONN_HANDLE_NONE + go func() { + if delay > 0 { + time.Sleep(delay) + } + + if !initiate() { + return + } - c.bx.StopWaitingForMaster(c, err) + StopWaitingForMaster(c.bx, c.prio, c, err) - c.rxvr.RemoveAll("shutdown") - c.rxvr.WaitUntilNoListeners() + c.rxvr.RemoveAll("shutdown") + c.rxvr.WaitUntilNoListeners() - c.wg.Wait() + c.wg.Wait() - c.abortNotifyListeners(err) + c.connecting = false + c.connHandle = BLE_CONN_HANDLE_NONE - c.disconnectChan <- err - close(c.disconnectChan) + c.abortNotifyListeners(err) + + c.disconnectChan <- err + close(c.disconnectChan) + }() } func (c *Conn) newDisconnectError(reason int) error { @@ -138,11 +149,10 @@ func (c *Conn) eventListen(bl *Listener) error { return case err, ok := <-bl.ErrChan: - if !ok { - return + if ok { + c.shutdown(0, err) } - - go c.shutdown(err) + return case bm, ok := <-bl.MsgChan: if !ok { @@ -179,7 +189,7 @@ func (c *Conn) eventListen(bl *Listener) error { c.encBlocker.Unblock(err) case *BleDisconnectEvt: - go c.shutdown(c.newDisconnectError(msg.Reason)) + c.shutdown(0, c.newDisconnectError(msg.Reason)) return default: @@ -252,10 +262,6 @@ func (c *Conn) startConnecting() error { c.mtx.Lock() defer c.mtx.Unlock() - if c.stopChan == nil { - return fmt.Errorf("Attempt to re-use conn object") - } - if c.connHandle != BLE_CONN_HANDLE_NONE { return nmxutil.NewSesnAlreadyOpenError( "BLE connection already established") @@ -332,7 +338,7 @@ func (c *Conn) Connect(bx *BleXport, ownAddrType BleAddrType, peer BleDev, r.PeerAddr = peer.Addr // Initiating a connection requires dedicated master privileges. - if err := c.bx.AcquireMaster(c); err != nil { + if err := AcquireMaster(c.bx, c.prio, c); err != nil { return err } defer c.bx.ReleaseMaster() @@ -685,12 +691,12 @@ func (c *Conn) InitiateSecurity() error { } defer c.rxvr.RemoveListener("security-initiate", bl) - c.encBlocker.Block() + c.encBlocker.Start() if err := securityInitiate(c.bx, bl, r); err != nil { return err } - encErr, tmoErr := c.encBlocker.Wait(time.Second * 15) + encErr, tmoErr := c.encBlocker.Wait(time.Second*15, c.stopChan) if encErr != nil { return encErr.(error) } @@ -747,19 +753,30 @@ func (c *Conn) Stop() error { c.mtx.Lock() defer c.mtx.Unlock() + var shutdownDelay time.Duration + var shutdownErr error + if c.connHandle != BLE_CONN_HANDLE_NONE { // Terminate the connection. On success, the conn object will shut // down upon receipt of the disconnect event. On failure, just force a // shutdown manually. if err := c.terminate(); err != nil { - go c.shutdown(err) + shutdownDelay = 0 + shutdownErr = err + } else { + // Force a shutdown in 10 seconds in case we never receive a + // disconnect event. + shutdownDelay = 10 * time.Second + shutdownErr = fmt.Errorf("forced shutdown; disconnect timeout") } } else { if c.connecting { c.connCancel() } - go c.shutdown(fmt.Errorf("Stopped")) + shutdownDelay = 0 + shutdownErr = fmt.Errorf("Stopped before connect complete") } + c.shutdown(shutdownDelay, shutdownErr) return nil } diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go index d41dc12..1b191f9 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/discover.go @@ -71,7 +71,7 @@ func (d *Discoverer) Start(advRptCb BleAdvRptFn) error { } // Scanning requires dedicated master privileges. - if err := d.params.Bx.AcquireMaster(d); err != nil { + if err := AcquireMaster(d.params.Bx, MASTER_PRIO_SCAN, d); err != nil { return err } defer d.params.Bx.ReleaseMaster() diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/master.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/master.go new file mode 100644 index 0000000..ef26fb4 --- /dev/null +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/master.go @@ -0,0 +1,135 @@ +package nmble + +import ( + "fmt" + "sync" + + "mynewt.apache.org/newtmgr/nmxact/nmxutil" +) + +// Represents the Bluetooth device's "master privileges." The device can only +// do one of the following actions at a time: +// * initiate connection +// * scan +// +// ("connector": client who wants to connect) +// ("scanner": client who wants to scan) +// +// This struct restricts master privileges to a single client at a time. It +// uses the following procedure to determine which of several clients to serve: +// If there is one or more waiting connectors: +// If a scanner is active, suspend it. +// Service the connectors in the order of their requests. +// Else (no waiting connectors): +// Service waiting scanner if there is one. +type Master struct { + res nmxutil.SingleResource + scanner *BleScanner + scanWait chan error + scanAcq chan struct{} + mtx sync.Mutex +} + +func NewMaster(x *BleXport, s *BleScanner) Master { + return Master{ + res: nmxutil.NewSingleResource(), + scanner: s, + scanAcq: make(chan struct{}), + } +} + +// Unblocks a waiting scanner. +func (m *Master) unblockScanner(err error) bool { + if m.scanWait == nil { + return false + } + + m.scanWait <- err + close(m.scanWait) + m.scanWait = nil + return true +} + +func (m *Master) AcquireConnect(token interface{}) error { + // Stop the scanner in case it is active; connections take priority. + m.scanner.Preempt() + + m.mtx.Lock() + defer m.mtx.Unlock() + + return m.res.Acquire(token) +} + +func (m *Master) AcquireScan(token interface{}) error { + m.mtx.Lock() + + // If the resource is unused, just acquire it. + if !m.res.Acquired() { + err := m.res.Acquire(token) + m.mtx.Unlock() + return err + } + + // Otherwise, wait until no one wants to connect. + if m.scanWait != nil { + m.mtx.Unlock() + return fmt.Errorf("Scanner already waiting for master privileges") + } + m.scanWait = make(chan error) + + m.mtx.Unlock() + + // Now we have to wait until someone releases the resource. When this + // happens, let the releaser know when the scanner has finished acquiring + // the resource. At that time, the call to Release() can unlock and + // return. + defer func() { m.scanAcq <- struct{}{} }() + + // Wait for the resource to be released. + if err := <-m.scanWait; err != nil { + return err + } + + return m.res.Acquire(token) +} + +func (m *Master) Release() { + m.mtx.Lock() + defer m.mtx.Unlock() + + if m.res.Release() { + // Next waiting connector acquired the resource. + return + } + + // No pending connects; hand resource to scanner if it wants it. + if m.unblockScanner(nil) { + // Don't return until scanner has fully acquired the resource. + <-m.scanAcq + } +} + +// Removes the specified connector from the wait queue. +func (m *Master) StopWaitingConnect(token interface{}, err error) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.res.StopWaiting(token, err) +} + +// Removes the specified scanner from the wait queue. +func (m *Master) StopWaitingScan(token interface{}, err error) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.unblockScanner(err) +} + +// Releases the resource and clears the wait queue. +func (m *Master) Abort(err error) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.unblockScanner(err) + m.res.Abort(err) +} diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/sync.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/sync.go new file mode 100644 index 0000000..95cae0f --- /dev/null +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmble/sync.go @@ -0,0 +1,202 @@ +package nmble + +import ( + "fmt" + "sync" + "time" + + "mynewt.apache.org/newtmgr/nmxact/nmxutil" +) + +const syncPollRate = time.Second + +type Syncer struct { + x *BleXport + stopCh chan struct{} + wg sync.WaitGroup + synced bool + syncBlocker nmxutil.Blocker + mtx sync.Mutex + + resetBcaster nmxutil.Bcaster + syncBcaster nmxutil.Bcaster +} + +func (s *Syncer) Refresh() (bool, error) { + r := NewSyncReq() + bl, err := s.x.AddListener(SeqKey(r.Seq)) + if err != nil { + return false, err + } + defer s.x.RemoveListener(bl) + + synced, err := checkSync(s.x, bl, r) + if err != nil { + return false, err + } + + s.setSynced(synced) + return synced, nil +} + +func (s *Syncer) checkSyncLoop() { + doneCh := make(chan struct{}) + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + s.BlockUntilSynced(nmxutil.DURATION_FOREVER, s.stopCh) + close(doneCh) + }() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + for { + s.Refresh() + + select { + case <-doneCh: + return + + case <-s.stopCh: + return + + case <-time.After(syncPollRate): + } + } + }() +} + +func (s *Syncer) setSynced(synced bool) { + s.mtx.Lock() + defer s.mtx.Unlock() + + if synced == s.synced { + return + } + + s.synced = synced + if s.synced { + s.syncBlocker.Unblock(nil) + } else { + s.syncBlocker.Start() + + // Listen for sync loss and reset in the background. + s.checkSyncLoop() + } + s.syncBcaster.Send(s.synced) +} + +func (s *Syncer) addSyncListener() (*Listener, error) { + key := TchKey(MSG_TYPE_SYNC_EVT, -1) + nmxutil.LogAddListener(3, key, 0, "sync") + return s.x.AddListener(key) +} + +func (s *Syncer) addResetListener() (*Listener, error) { + key := TchKey(MSG_TYPE_RESET_EVT, -1) + nmxutil.LogAddListener(3, key, 0, "reset") + return s.x.AddListener(key) +} + +func (s *Syncer) listen() error { + errChan := make(chan error) + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + // Initial actions can cause an error to be returned. + syncl, err := s.addSyncListener() + if err != nil { + errChan <- err + close(errChan) + return + } + defer s.x.RemoveListener(syncl) + + resetl, err := s.addResetListener() + if err != nil { + errChan <- err + close(errChan) + return + } + defer s.x.RemoveListener(resetl) + + // Initial actions complete. + close(errChan) + + for { + select { + case <-syncl.ErrChan: + // XXX + case bm := <-syncl.MsgChan: + switch msg := bm.(type) { + case *BleSyncEvt: + s.setSynced(msg.Synced) + } + + case <-resetl.ErrChan: + // XXX + case bm := <-resetl.MsgChan: + switch msg := bm.(type) { + case *BleResetEvt: + s.setSynced(false) + s.resetBcaster.Send(msg.Reason) + } + + case <-s.stopCh: + return + } + } + }() + + return <-errChan +} + +func (s *Syncer) shutdown() { + s.syncBcaster.Clear() + s.resetBcaster.Clear() + s.syncBlocker.Unblock(nil) + + s.stopCh = nil +} + +func (s *Syncer) Start(x *BleXport) error { + s.x = x + s.stopCh = make(chan struct{}) + s.syncBlocker.Start() + s.checkSyncLoop() + return s.listen() +} + +func (s *Syncer) Stop() error { + if s.stopCh == nil { + return fmt.Errorf("Syncer already stopped") + } + + close(s.stopCh) + s.wg.Wait() + + s.shutdown() + + return nil +} + +func (s *Syncer) BlockUntilSynced(timeout time.Duration, + stopChan <-chan struct{}) error { + + _, err := s.syncBlocker.Wait(timeout, stopChan) + return err +} + +func (s *Syncer) ListenSync() chan interface{} { + return s.syncBcaster.Listen() +} + +func (s *Syncer) ListenReset() chan interface{} { + return s.resetBcaster.Listen() +} diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/block.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/block.go index 45b2e99..3ff5093 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/block.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/block.go @@ -33,7 +33,23 @@ type Blocker struct { val interface{} } -func (b *Blocker) Wait(timeout time.Duration) (interface{}, error) { +func (b *Blocker) unblockNoLock(val interface{}) { + if b.ch != nil { + b.val = val + close(b.ch) + b.ch = nil + } +} + +func (b *Blocker) startNoLock() { + if b.ch == nil { + b.ch = make(chan struct{}) + } +} + +func (b *Blocker) Wait(timeout time.Duration, stopChan <-chan struct{}) ( + interface{}, error) { + b.mtx.Lock() ch := b.ch b.mtx.Unlock() @@ -42,6 +58,10 @@ func (b *Blocker) Wait(timeout time.Duration) (interface{}, error) { return b.val, nil } + if stopChan == nil { + stopChan = make(chan struct{}) + } + timer := time.NewTimer(timeout) select { case <-ch: @@ -49,25 +69,29 @@ func (b *Blocker) Wait(timeout time.Duration) (interface{}, error) { return b.val, nil case <-timer.C: return nil, fmt.Errorf("timeout after %s", timeout.String()) + case <-stopChan: + return nil, fmt.Errorf("aborted") } } -func (b *Blocker) Block() { +func (b *Blocker) Start() { b.mtx.Lock() defer b.mtx.Unlock() - if b.ch == nil { - b.ch = make(chan struct{}) - } + b.startNoLock() } func (b *Blocker) Unblock(val interface{}) { b.mtx.Lock() defer b.mtx.Unlock() - if b.ch != nil { - b.val = val - close(b.ch) - b.ch = nil - } + b.unblockNoLock(val) +} + +func (b *Blocker) UnblockAndRestart(val interface{}) { + b.mtx.Lock() + defer b.mtx.Unlock() + + b.unblockNoLock(val) + b.startNoLock() } diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/sres.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/sres.go index 57a9137..b927208 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/sres.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/nmxutil/sres.go @@ -65,19 +65,21 @@ func (s *SingleResource) Acquire(token interface{}) error { return nil } -func (s *SingleResource) Release() { +// @return true if a pending waiter acquired the resource; +// false if the resource is now free. +func (s *SingleResource) Release() bool { s.mtx.Lock() if !s.acquired { panic("SingleResource release without acquire") s.mtx.Unlock() - return + return false } if len(s.waitQueue) == 0 { s.acquired = false s.mtx.Unlock() - return + return false } w := s.waitQueue[0] @@ -86,6 +88,8 @@ func (s *SingleResource) Release() { s.mtx.Unlock() w.c <- nil + + return true } func (s *SingleResource) StopWaiting(token interface{}, err error) { @@ -109,3 +113,10 @@ func (s *SingleResource) Abort(err error) { } s.waitQueue = nil } + +func (s *SingleResource) Acquired() bool { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.acquired +} diff --git a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go index 3a00595..ac07855 100644 --- a/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go +++ b/newtmgr/vendor/mynewt.apache.org/newtmgr/nmxact/scan/scan.go @@ -20,6 +20,8 @@ package scan import ( + "fmt" + "mynewt.apache.org/newtmgr/nmxact/bledefs" "mynewt.apache.org/newtmgr/nmxact/sesn" ) @@ -29,6 +31,10 @@ type ScanPeer struct { PeerSpec sesn.PeerSpec } +func (p *ScanPeer) String() string { + return fmt.Sprintf("%s, %s", p.PeerSpec.Ble.String(), p.HwId) +} + type ScanFn func(peer ScanPeer) type CfgBle struct { -- To stop receiving notification emails like this one, please contact "commits@mynewt.apache.org" .