servicecomb-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] little-cui closed pull request #325: SCB-472 Null point reference in zipkin plugin
Date Fri, 13 Apr 2018 03:59:58 GMT
little-cui closed pull request #325: SCB-472 Null point reference in zipkin plugin
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/325
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index 02786d95..d468dbe2 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -108,6 +108,12 @@ log_format = text
 # whether enable record syslog
 log_sys = false
 
+###################################################################
+# Frontend Configurations
+###################################################################
+frontend_host_ip=127.0.0.1
+frontend_host_port=30103
+
 ###################################################################
 # above is the global configurations
 # you can overide above configuration in specific env
@@ -120,9 +126,3 @@ logfile = ./service-center.log
 [dev]
 loglevel = DEBUG
 logfile = ""
-
-###################################################################
-# Frontend Configurations
-###################################################################
-frontend_host_ip=127.0.0.1
-frontend_host_port=30103
diff --git a/main.go b/main.go
index d76e34cb..3d7ac976 100644
--- a/main.go
+++ b/main.go
@@ -18,8 +18,18 @@ package main
 
 // plugins
 import _ "github.com/apache/incubator-servicecomb-service-center/server/bootstrap"
-import "github.com/apache/incubator-servicecomb-service-center/server"
+import (
+	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	"github.com/apache/incubator-servicecomb-service-center/server"
+	"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
+)
 
 func main() {
 	server.Run()
+
+	util.GoCloseAndWait()
+
+	backend.Registry().Close()
+
+	util.Logger().Warn("service center exited", nil)
 }
diff --git a/pkg/async/async_task.go b/pkg/async/async_task.go
index 14727a17..73ceb384 100644
--- a/pkg/async/async_task.go
+++ b/pkg/async/async_task.go
@@ -39,6 +39,7 @@ type scheduler struct {
 	queue      *util.UniQueue
 	latestTask AsyncTask
 	once       sync.Once
+	goroutine  *util.GoRoutine
 }
 
 func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
@@ -47,7 +48,7 @@ func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
 	}
 
 	s.once.Do(func() {
-		go s.do()
+		s.goroutine.Do(s.do)
 	})
 
 	err = s.queue.Put(ctx, task)
@@ -57,15 +58,17 @@ func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
 	return s.latestTask.Err()
 }
 
-func (s *scheduler) do() {
+func (s *scheduler) do(ctx context.Context) {
 	for {
 		select {
+		case <-ctx.Done():
+			return
 		case task, ok := <-s.queue.Chan():
 			if !ok {
 				return
 			}
 			at := task.(AsyncTask)
-			at.Do(context.Background())
+			at.Do(ctx)
 			s.latestTask = at
 		}
 	}
@@ -73,6 +76,15 @@ func (s *scheduler) do() {
 
 func (s *scheduler) Close() {
 	s.queue.Close()
+	s.goroutine.Close(true)
+}
+
+func newScheduler(task AsyncTask) *scheduler {
+	return &scheduler{
+		queue:      util.NewUniQueue(),
+		latestTask: task,
+		goroutine:  util.NewGo(context.Background()),
+	}
 }
 
 type AsyncTaskService struct {
@@ -99,10 +111,7 @@ func (lat *AsyncTaskService) getOrNewScheduler(task AsyncTask) (s *scheduler, is
 		s, ok = lat.schedules[key]
 		if !ok {
 			isNew = true
-			s = &scheduler{
-				queue:      util.NewUniQueue(),
-				latestTask: task,
-			}
+			s = newScheduler(task)
 			lat.schedules[key] = s
 		}
 		lat.lock.Unlock()
@@ -166,11 +175,11 @@ func (lat *AsyncTaskService) LatestHandled(key string) (AsyncTask, error) {
 	return s.latestTask, nil
 }
 
-func (lat *AsyncTaskService) daemon(stopCh <-chan struct{}) {
+func (lat *AsyncTaskService) daemon(ctx context.Context) {
 	util.SafeCloseChan(lat.ready)
 	for {
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 			util.Logger().Debugf("daemon thread exited for AsyncTaskService is stopped")
 			return
 		case <-time.After(DEFAULT_REMOVE_TASKS_INTERVAL):
@@ -228,7 +237,7 @@ func NewAsyncTaskService() *AsyncTaskService {
 	return &AsyncTaskService{
 		schedules:   make(map[string]*scheduler, DEFAULT_MAX_SCHEDULE_COUNT),
 		removeTasks: make(map[string]struct{}, DEFAULT_MAX_SCHEDULE_COUNT),
-		goroutine:   util.NewGo(make(chan struct{})),
+		goroutine:   util.NewGo(context.Background()),
 		ready:       make(chan struct{}),
 		isClose:     true,
 	}
diff --git a/pkg/chain/callback.go b/pkg/chain/callback.go
index 9b2e7fa2..36577402 100644
--- a/pkg/chain/callback.go
+++ b/pkg/chain/callback.go
@@ -47,11 +47,7 @@ func (cb *Callback) Invoke(r Result) {
 }
 
 func syncInvoke(f func(r Result), r Result) {
-	defer func() {
-		if itf := recover(); itf != nil {
-			util.LogPanic(itf)
-		}
-	}()
+	defer util.RecoverAndReport()
 	if f == nil {
 		util.Logger().Errorf(nil, "Callback function is nil. result: %s,", r)
 		return
diff --git a/pkg/etcdsync/mutex.go b/pkg/etcdsync/mutex.go
index f54ef6f5..d1a35b94 100644
--- a/pkg/etcdsync/mutex.go
+++ b/pkg/etcdsync/mutex.go
@@ -134,7 +134,8 @@ func (m *DLock) Lock(wait bool) error {
 		util.Logger().Warnf(err, "Key %s is locked, waiting for other node releases it, id=%s", m.builder.key, m.id)
 
 		ctx, cancel := context.WithTimeout(m.builder.ctx, DEFAULT_LOCK_TTL*time.Second)
-		go func() {
+		util.Go(func(context.Context) {
+			defer cancel()
 			err := backend.Registry().Watch(ctx,
 				registry.WithStrKey(m.builder.key),
 				registry.WithWatchCallback(
@@ -146,10 +147,9 @@ func (m *DLock) Lock(wait bool) error {
 						return nil
 					}))
 			if err != nil {
-				util.Logger().Errorf(nil, "%s, key=%s, id=%s", err.Error(), m.builder.key, m.id)
+				util.Logger().Warnf(nil, "%s, key=%s, id=%s", err.Error(), m.builder.key, m.id)
 			}
-			cancel()
-		}()
+		})
 		select {
 		case <-ctx.Done():
 			continue // 可以重新尝试获取锁
diff --git a/pkg/grace/grace.go b/pkg/grace/grace.go
index fef6cafd..3ec7cd35 100644
--- a/pkg/grace/grace.go
+++ b/pkg/grace/grace.go
@@ -20,6 +20,7 @@ import (
 	"flag"
 	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	"golang.org/x/net/context"
 	"os"
 	"os/exec"
 	"os/signal"
@@ -71,7 +72,7 @@ func Init() {
 		flag.Parse()
 	}
 
-	go handleSignals()
+	util.Go(handleSignals)
 }
 
 func Before(f func()) {
@@ -111,26 +112,28 @@ func fireSignalHook(ppFlag int, sig os.Signal) {
 	}
 }
 
-func handleSignals() {
+func handleSignals(ctx context.Context) {
 	var sig os.Signal
 
 	sigCh := make(chan os.Signal)
 	signal.Notify(sigCh, registerSignals...)
 
 	for {
-		sig = <-sigCh
-		fireSignalHook(PreSignal, sig)
-		switch sig {
-		case syscall.SIGHUP:
-			util.Logger().Debugf("received signal 'SIGHUP', now forking")
-			err := fork()
-			if err != nil {
-				util.Logger().Errorf(err, "fork a process failed")
+		select {
+		case <-ctx.Done():
+			return
+		case sig = <-sigCh:
+			fireSignalHook(PreSignal, sig)
+			switch sig {
+			case syscall.SIGHUP:
+				util.Logger().Debugf("received signal '%v', now forking", sig)
+				err := fork()
+				if err != nil {
+					util.Logger().Errorf(err, "fork a process failed")
+				}
 			}
-		default:
-			util.Logger().Warnf(nil, "received signal '%v'", sig)
+			fireSignalHook(PostSignal, sig)
 		}
-		fireSignalHook(PostSignal, sig)
 	}
 }
 
diff --git a/pkg/util/goroutines.go b/pkg/util/goroutines.go
index a021f52c..bb9b9d99 100644
--- a/pkg/util/goroutines.go
+++ b/pkg/util/goroutines.go
@@ -16,42 +16,37 @@
  */
 package util
 
-import "sync"
+import (
+	"golang.org/x/net/context"
+	"sync"
+)
 
 type GoRoutine struct {
-	stopCh chan struct{}
+	ctx    context.Context
+	cancel context.CancelFunc
 	wg     sync.WaitGroup
 	mux    sync.RWMutex
-	once   sync.Once
 	closed bool
 }
 
-func (g *GoRoutine) Init(stopCh chan struct{}) {
-	g.once.Do(func() {
-		g.stopCh = stopCh
-	})
-}
-
-func (g *GoRoutine) StopCh() <-chan struct{} {
-	return g.stopCh
-}
-
-func (g *GoRoutine) Do(f func(<-chan struct{})) {
+func (g *GoRoutine) Do(f func(context.Context)) {
 	g.wg.Add(1)
 	go func() {
 		defer g.wg.Done()
-		f(g.StopCh())
+		defer RecoverAndReport()
+		f(g.ctx)
 	}()
 }
 
 func (g *GoRoutine) Close(wait bool) {
 	g.mux.Lock()
 	defer g.mux.Unlock()
+
 	if g.closed {
 		return
 	}
 	g.closed = true
-	close(g.stopCh)
+	g.cancel()
 	if wait {
 		g.Wait()
 	}
@@ -61,27 +56,26 @@ func (g *GoRoutine) Wait() {
 	g.wg.Wait()
 }
 
-var defaultGo GoRoutine
+var defaultGo *GoRoutine
 
 func init() {
-	GoInit()
+	defaultGo = NewGo(context.Background())
 }
 
-func Go(f func(<-chan struct{})) {
+func Go(f func(context.Context)) {
 	defaultGo.Do(f)
 }
 
-func GoInit() {
-	defaultGo.Init(make(chan struct{}))
-}
-
 func GoCloseAndWait() {
 	defaultGo.Close(true)
-	Logger().Debugf("all goroutines quit normally")
+	Logger().Debugf("all goroutines exited")
 }
 
-func NewGo(stopCh chan struct{}) *GoRoutine {
-	gr := &GoRoutine{}
-	gr.Init(stopCh)
+func NewGo(ctx context.Context) *GoRoutine {
+	ctx, cancel := context.WithCancel(ctx)
+	gr := &GoRoutine{
+		ctx:    ctx,
+		cancel: cancel,
+	}
 	return gr
 }
diff --git a/pkg/util/goroutines_test.go b/pkg/util/goroutines_test.go
index cfc09191..d8088142 100644
--- a/pkg/util/goroutines_test.go
+++ b/pkg/util/goroutines_test.go
@@ -18,72 +18,66 @@ package util
 
 import (
 	"fmt"
+	"golang.org/x/net/context"
 	"sync"
 	"testing"
 	"time"
 )
 
-func TestGoRoutine_Init(t *testing.T) {
-	var test GoRoutine
-	stopCh1 := make(chan struct{})
-	defer close(stopCh1)
-	stopCh2 := make(chan struct{})
-	defer close(stopCh2)
-
-	test.Init(stopCh1)
-	c := test.StopCh()
-	if c != stopCh1 {
-		fail(t, "init GoRoutine failed.")
-	}
-
-	test.Init(stopCh2)
-	c = test.StopCh()
-	if c == stopCh2 {
-		fail(t, "init GoRoutine twice.")
-	}
-}
-
 func TestGoRoutine_Do(t *testing.T) {
-	var test1 GoRoutine
-	stopCh := make(chan struct{})
-	test1.Init(make(chan struct{}))
-	test1.Do(func(neverStopCh <-chan struct{}) {
-		defer close(stopCh)
+	test1 := NewGo(context.Background())
+	defer test1.Close(true)
+	stopCh1 := make(chan struct{})
+	test1.Do(func(ctx context.Context) {
+		defer close(stopCh1)
 		select {
-		case <-neverStopCh:
-			fail(t, "neverStopCh should not be closed.")
+		case <-ctx.Done():
+			fail(t, "ctx should not be done.")
 		case <-time.After(time.Second):
 		}
 	})
-	<-stopCh
+	<-stopCh1
 
-	var test2 GoRoutine
-	stopCh1 := make(chan struct{})
+	ctx, cancel := context.WithCancel(context.Background())
+	test2 := NewGo(ctx)
+	defer test2.Close(true)
 	stopCh2 := make(chan struct{})
-	test2.Init(stopCh1)
-	test2.Do(func(stopCh <-chan struct{}) {
+	test2.Do(func(ctx context.Context) {
 		defer close(stopCh2)
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 		case <-time.After(time.Second):
-			fail(t, "time out to wait stopCh1 close.")
+			fail(t, "time out to wait stopCh2 close.")
 		}
 	})
-	close(stopCh1)
+	cancel()
 	<-stopCh2
+
+	ctx, _ = context.WithTimeout(context.Background(), 0)
+	test3 := NewGo(ctx)
+	defer test3.Close(true)
+	stopCh3 := make(chan struct{})
+	test3.Do(func(ctx context.Context) {
+		defer close(stopCh3)
+		select {
+		case <-ctx.Done():
+		case <-time.After(time.Second):
+			fail(t, "time out to wait ctx done.")
+		}
+	})
+	<-stopCh3
 }
 
 func TestGoRoutine_Wait(t *testing.T) {
-	var test GoRoutine
 	var mux sync.Mutex
 	MAX := 10
 	resultArr := make([]int, 0, MAX)
-	test.Init(make(chan struct{}))
+	test := NewGo(context.Background())
 	for i := 0; i < MAX; i++ {
 		func(i int) {
-			test.Do(func(neverStopCh <-chan struct{}) {
+			test.Do(func(ctx context.Context) {
 				select {
-				case <-neverStopCh:
+				case <-ctx.Done():
 				case <-time.After(time.Second):
 					mux.Lock()
 					resultArr = append(resultArr, i)
@@ -103,13 +97,12 @@ func TestGoRoutine_Wait(t *testing.T) {
 }
 
 func TestGoRoutine_Close(t *testing.T) {
-	var test GoRoutine
-	test.Init(make(chan struct{}))
-	test.Do(func(stopCh <-chan struct{}) {
+	test := NewGo(context.Background())
+	test.Do(func(ctx context.Context) {
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 		case <-time.After(time.Second):
-			fail(t, "time out to wait stopCh close.")
+			fail(t, "time out to wait ctx close.")
 		}
 	})
 	test.Close(true)
@@ -117,20 +110,18 @@ func TestGoRoutine_Close(t *testing.T) {
 }
 
 func TestGo(t *testing.T) {
-	GoInit()
-	Go(func(stopCh <-chan struct{}) {
+	Go(func(ctx context.Context) {
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-time.After(time.Second):
 			}
 		}
 	})
+	Go(func(ctx context.Context) {
+		var a *int
+		fmt.Println(*a)
+	})
 	GoCloseAndWait()
 }
-
-func TestNewGo(t *testing.T) {
-	g := NewGo(make(chan struct{}))
-	defer g.Close(true)
-}
diff --git a/pkg/util/log.go b/pkg/util/log.go
index 5e475a7b..bc82a899 100644
--- a/pkg/util/log.go
+++ b/pkg/util/log.go
@@ -20,6 +20,7 @@ import (
 	"fmt"
 	"github.com/ServiceComb/paas-lager"
 	"github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager"
+	"golang.org/x/net/context"
 	"os"
 	"path/filepath"
 	"runtime"
@@ -57,7 +58,7 @@ func init() {
 	loggers = make(map[string]lager.Logger, 10)
 	loggerNames = make(map[string]string, 10)
 	// make LOGGER do not be nil, new a stdout logger
-	LOGGER = newLogger(fromLagerConfig(defaultLagerConfig))
+	LOGGER = NewLogger(fromLagerConfig(defaultLagerConfig))
 }
 
 func fromLagerConfig(c *stlager.Config) LoggerConfig {
@@ -82,7 +83,7 @@ func toLagerConfig(c LoggerConfig) stlager.Config {
 }
 
 // newLog new log, unsafe
-func newLogger(cfg LoggerConfig) lager.Logger {
+func NewLogger(cfg LoggerConfig) lager.Logger {
 	stlager.Init(toLagerConfig(cfg))
 	return stlager.NewLogger(cfg.LoggerFile)
 }
@@ -93,7 +94,7 @@ func InitGlobalLogger(cfg LoggerConfig) {
 		cfg.LoggerLevel = defaultLagerConfig.LoggerLevel
 	}
 	loggerConfig = cfg
-	LOGGER = newLogger(cfg)
+	LOGGER = NewLogger(cfg)
 	// log rotate
 	RunLogDirRotate(cfg)
 	// recreate the deleted log file
@@ -144,7 +145,7 @@ func Logger() lager.Logger {
 			if len(cfg.LoggerFile) != 0 {
 				cfg.LoggerFile = filepath.Join(filepath.Dir(cfg.LoggerFile), logFile+".log")
 			}
-			logger = newLogger(cfg)
+			logger = NewLogger(cfg)
 			loggers[logFile] = logger
 			LOGGER.Warnf(nil, "match %s, new logger %s for %s", prefix, logFile, funcFullName)
 		}
@@ -190,10 +191,10 @@ func monitorLogFile() {
 	if len(loggerConfig.LoggerFile) == 0 {
 		return
 	}
-	Go(func(stopCh <-chan struct{}) {
+	Go(func(ctx context.Context) {
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-time.After(time.Minute):
 				Logger().Debug(fmt.Sprintf("Check log file at %s", time.Now()))
diff --git a/pkg/util/logrotate.go b/pkg/util/logrotate.go
index db4b79f9..e7dbcefd 100644
--- a/pkg/util/logrotate.go
+++ b/pkg/util/logrotate.go
@@ -19,6 +19,7 @@ package util
 import (
 	"archive/zip"
 	"fmt"
+	"golang.org/x/net/context"
 	"io"
 	"os"
 	"path/filepath"
@@ -293,10 +294,10 @@ func CopyFile(srcFile, destFile string) error {
 }
 
 func RunLogDirRotate(cfg LoggerConfig) {
-	Go(func(stopCh <-chan struct{}) {
+	Go(func(ctx context.Context) {
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-time.After(cfg.LogRotatePeriod):
 				LogRotate(filepath.Dir(cfg.LoggerFile), cfg.LogRotateSize, cfg.LogBackupCount)
diff --git a/server/api.go b/server/api.go
index 26ac55a2..f434bdc4 100644
--- a/server/api.go
+++ b/server/api.go
@@ -36,8 +36,9 @@ func init() {
 	InitAPI()
 
 	apiServer = &APIServer{
-		isClose: true,
-		err:     make(chan error, 1),
+		isClose:   true,
+		err:       make(chan error, 1),
+		goroutine: util.NewGo(context.Background()),
 	}
 }
 
@@ -66,6 +67,7 @@ type APIServer struct {
 	isClose   bool
 	forked    bool
 	err       chan error
+	goroutine *util.GoRoutine
 }
 
 const (
@@ -176,16 +178,18 @@ func (s *APIServer) doAPIServerHeartBeat(pCtx context.Context) {
 }
 
 func (s *APIServer) startHeartBeatService() {
-	go func() {
+	s.goroutine.Do(func(ctx context.Context) {
 		for {
 			select {
+			case <-ctx.Done():
+				return
 			case <-s.err:
 				return
 			case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
 				s.doAPIServerHeartBeat(context.Background())
 			}
 		}
-	}()
+	})
 }
 
 func (s *APIServer) graceDone() {
@@ -211,14 +215,14 @@ func (s *APIServer) startRESTServer() (err error) {
 	}
 	util.Logger().Infof("Local listen address: %s, host: %s.", ep, s.HostName)
 
-	go func() {
+	s.goroutine.Do(func(_ context.Context) {
 		err := s.restSrv.Serve()
 		if s.isClose {
 			return
 		}
 		util.Logger().Errorf(err, "error to start REST API server %s", ep)
 		s.err <- err
-	}()
+	})
 	return
 }
 
@@ -234,14 +238,14 @@ func (s *APIServer) startRPCServer() (err error) {
 	}
 	util.Logger().Infof("Local listen address: %s, host: %s.", ep, s.HostName)
 
-	go func() {
+	s.goroutine.Do(func(_ context.Context) {
 		err := s.rpcSrv.Serve()
 		if s.isClose {
 			return
 		}
 		util.Logger().Errorf(err, "error to start RPC API server %s", ep)
 		s.err <- err
-	}()
+	})
 	return
 }
 
@@ -301,6 +305,8 @@ func (s *APIServer) Stop() {
 
 	close(s.err)
 
+	s.goroutine.Close(true)
+
 	util.Logger().Info("api server stopped.")
 }
 
diff --git a/server/broker/service.go b/server/broker/service.go
index bde8f3e8..3ae63c61 100644
--- a/server/broker/service.go
+++ b/server/broker/service.go
@@ -34,7 +34,7 @@ import (
 	"golang.org/x/net/context"
 )
 
-var BrokerServiceAPI *BrokerService = &BrokerService{}
+var BrokerServiceAPI = &BrokerService{}
 
 type BrokerService struct {
 }
diff --git a/server/broker/store.go b/server/broker/store.go
index 0cdddc59..95c61a41 100644
--- a/server/broker/store.go
+++ b/server/broker/store.go
@@ -21,6 +21,7 @@ import (
 
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	sstore "github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
+	"golang.org/x/net/context"
 )
 
 const (
@@ -72,12 +73,16 @@ func (s *BKvStore) newStore(t sstore.StoreType, opts ...sstore.KvCacherCfgOption
 	s.newIndexer(t, sstore.NewKvCacher(opts...))
 }
 
-func (s *BKvStore) store() {
+func (s *BKvStore) store(ctx context.Context) {
 	for t := sstore.StoreType(0); t != typeEnd; t++ {
 		s.newStore(t)
 	}
 	for _, i := range s.bindexers {
-		<-i.Ready()
+		select {
+		case <-ctx.Done():
+			return
+		case <-i.Ready():
+		}
 	}
 	util.SafeCloseChan(s.bready)
 
@@ -120,7 +125,13 @@ func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) {
 }
 
 func (s *BKvStore) Run() {
-	go s.store()
+	util.Go(func(ctx context.Context) {
+		s.store(ctx)
+		select {
+		case <-ctx.Done():
+			s.Stop()
+		}
+	})
 }
 
 func (s *BKvStore) Ready() <-chan struct{} {
@@ -154,3 +165,18 @@ func (s *BKvStore) Verification() *sstore.Indexer {
 func (s *BKvStore) PactLatest() *sstore.Indexer {
 	return s.bindexers[PACT_LATEST]
 }
+
+func (s *BKvStore) Stop() {
+	if s.bisClose {
+		return
+	}
+	s.bisClose = true
+
+	for _, i := range s.bindexers {
+		i.Stop()
+	}
+
+	util.SafeCloseChan(s.bready)
+
+	util.Logger().Debugf("broker store daemon stopped")
+}
diff --git a/server/broker/util.go b/server/broker/util.go
index 23e98ca1..895ee8ca 100644
--- a/server/broker/util.go
+++ b/server/broker/util.go
@@ -25,14 +25,16 @@ import (
 	"strconv"
 	"strings"
 
-	"github.com/ServiceComb/paas-lager"
 	"github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
-	backend "github.com/apache/incubator-servicecomb-service-center/server/core/backend"
+	"github.com/apache/incubator-servicecomb-service-center/server/core"
+	"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
 	scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
 	"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
 	serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
+	"path/filepath"
+	"time"
 )
 
 var PactLogger lager.Logger
@@ -88,12 +90,18 @@ var brokerAPILinksTitles = map[string]string{
 
 func init() {
 	//define Broker logger
-	stlager.Init(stlager.Config{
-		LoggerLevel:   "INFO",
-		LoggerFile:    "broker_srvc.log",
-		EnableRsyslog: false,
+	name := ""
+	if len(core.ServerInfo.Config.LogFilePath) != 0 {
+		name = filepath.Join(filepath.Dir(core.ServerInfo.Config.LogFilePath), "broker_srvc.log")
+	}
+	PactLogger = util.NewLogger(util.LoggerConfig{
+		LoggerLevel:     core.ServerInfo.Config.LogLevel,
+		LoggerFile:      name,
+		LogFormatText:   core.ServerInfo.Config.LogFormat == "text",
+		LogRotatePeriod: 30 * time.Second,
+		LogRotateSize:   int(core.ServerInfo.Config.LogRotateSize),
+		LogBackupCount:  int(core.ServerInfo.Config.LogBackupCount),
 	})
-	PactLogger = stlager.NewLogger("broker_srvc")
 }
 
 func GetDefaultTenantProject() string {
diff --git a/server/core/0_init.go b/server/core/0_init.go
index 1b888492..8749af62 100644
--- a/server/core/0_init.go
+++ b/server/core/0_init.go
@@ -87,7 +87,6 @@ func initLogger() {
 }
 
 func handleSignals() {
-	var sig os.Signal
 	sigCh := make(chan os.Signal)
 	signal.Notify(sigCh,
 		syscall.SIGINT,
@@ -95,13 +94,14 @@ func handleSignals() {
 		syscall.SIGTERM,
 	)
 	wait := 5 * time.Second
-	for {
-		sig = <-sigCh
+	for sig := range sigCh {
 		switch sig {
 		case syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM:
 			<-time.After(wait)
-			util.Logger().Warnf(nil, "Waiting for server response timed out(%s), force shutdown.", wait)
+			util.Logger().Warnf(nil, "waiting for server response timed out(%s), force shutdown", wait)
 			os.Exit(1)
+		default:
+			util.Logger().Warnf(nil, "received signal '%v'", sig)
 		}
 	}
 }
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index c6ade974..39ba733f 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -169,12 +169,12 @@ type KvCacher struct {
 	lastRev         int64
 	noEventInterval int
 
-	ready   chan struct{}
-	lw      ListWatcher
-	mux     sync.Mutex
-	once    sync.Once
-	cache   *KvCache
-	goroute *util.GoRoutine
+	ready     chan struct{}
+	lw        ListWatcher
+	mux       sync.Mutex
+	once      sync.Once
+	cache     *KvCache
+	goroutine *util.GoRoutine
 }
 
 func (c *KvCacher) needList() bool {
@@ -267,23 +267,18 @@ func (c *KvCacher) needDeferHandle(evts []*Event) bool {
 	return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
 }
 
-func (c *KvCacher) refresh(stopCh <-chan struct{}) {
+func (c *KvCacher) refresh(ctx context.Context) {
 	util.Logger().Debugf("start to list and watch %s", c.Cfg)
-	ctx, cancel := context.WithCancel(context.Background())
-	c.goroute.Do(func(stopCh <-chan struct{}) {
-		defer cancel()
-		<-stopCh
-	})
 	for {
 		start := time.Now()
 		c.ListAndWatch(ctx)
 		watchDuration := time.Since(start)
-		nextPeriod := 0 * time.Second
+		nextPeriod := c.Cfg.Period
 		if watchDuration > 0 && c.Cfg.Period > watchDuration {
 			nextPeriod = c.Cfg.Period - watchDuration
 		}
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 			util.Logger().Debugf("stop to list and watch %s", c.Cfg)
 			return
 		case <-time.After(nextPeriod):
@@ -291,7 +286,7 @@ func (c *KvCacher) refresh(stopCh <-chan struct{}) {
 	}
 }
 
-func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
+func (c *KvCacher) deferHandle(ctx context.Context) {
 	if c.Cfg.DeferHandler == nil {
 		return
 	}
@@ -299,7 +294,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
 	i, evts := 0, make([]*Event, event_block_size)
 	for {
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 			return
 		case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
 			if !ok {
@@ -524,8 +519,8 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) {
 }
 
 func (c *KvCacher) run() {
-	c.goroute.Do(c.refresh)
-	c.goroute.Do(c.deferHandle)
+	c.goroutine.Do(c.refresh)
+	c.goroutine.Do(c.deferHandle)
 }
 
 func (c *KvCacher) Cache() Cache {
@@ -537,7 +532,7 @@ func (c *KvCacher) Run() {
 }
 
 func (c *KvCacher) Stop() {
-	c.goroute.Close(true)
+	c.goroutine.Close(true)
 
 	util.SafeCloseChan(c.ready)
 }
@@ -577,7 +572,7 @@ func NewKvCacher(opts ...KvCacherCfgOption) *KvCacher {
 			Client: backend.Registry(),
 			Key:    cfg.Key,
 		},
-		goroute: util.NewGo(make(chan struct{})),
+		goroutine: util.NewGo(context.Background()),
 	}
 	cacher.cache = NewKvCache(cacher, cfg.InitSize)
 	return cacher
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index d35f9734..43464f06 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -21,6 +21,7 @@ import (
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
 	"github.com/coreos/etcd/mvcc/mvccpb"
+	"golang.org/x/net/context"
 	"sync"
 	"time"
 )
@@ -98,12 +99,12 @@ func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
 	return iedh.deferCh
 }
 
-func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
+func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
 	defer util.RecoverAndReport()
 	t, n := iedh.newTimer(), false
 	for {
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 			return
 		case evts := <-iedh.pendingCh:
 			for _, evt := range evts {
@@ -117,7 +118,7 @@ func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
 			}
 
 			total := iedh.cache.Size()
-			if !iedh.enabled && del > 0 && total > 0 && float64(del) >= float64(total)*iedh.Percent {
+			if !iedh.enabled && del > 0 && total > 5 && float64(del) >= float64(total)*iedh.Percent {
 				iedh.enabled = true
 				util.Logger().Warnf(nil, "self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
 					del, total, iedh.Percent*100)
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 72360503..5cf3e6dc 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -186,11 +186,11 @@ func (i *Indexer) OnCacheEvent(evt *KvEvent) {
 }
 
 func (i *Indexer) buildIndex() {
-	i.goroutine.Do(func(stopCh <-chan struct{}) {
+	i.goroutine.Do(func(ctx context.Context) {
 		util.SafeCloseChan(i.ready)
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case evt, ok := <-i.prefixBuildQueue:
 				if !ok {
@@ -317,7 +317,7 @@ func NewCacheIndexer(t StoreType, cr Cacher) *Indexer {
 		cacheType:        t,
 		prefixIndex:      make(map[string]map[string]struct{}, DEFAULT_MAX_EVENT_COUNT),
 		prefixBuildQueue: make(chan *KvEvent, DEFAULT_MAX_EVENT_COUNT),
-		goroutine:        util.NewGo(make(chan struct{})),
+		goroutine:        util.NewGo(context.Background()),
 		ready:            make(chan struct{}),
 		isClose:          true,
 	}
diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go
index 189f9094..4ffb9d45 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -131,17 +131,17 @@ func (w *Watcher) EventBus() <-chan []*Event {
 	return w.bus
 }
 
-func (w *Watcher) process() {
+func (w *Watcher) process(_ context.Context) {
 	stopCh := make(chan struct{})
 	ctx, cancel := context.WithTimeout(w.ListOps.Context, w.ListOps.Timeout)
-	go func() {
+	util.Go(func(_ context.Context) {
 		defer close(stopCh)
 		w.lw.doWatch(ctx, w.sendEvent)
-	}()
+	})
 
 	select {
 	case <-stopCh:
-		// time out
+		// timed out or exception
 		w.Stop()
 	case <-w.stopCh:
 		cancel()
@@ -180,6 +180,6 @@ func newWatcher(lw *ListWatcher, listOps *ListOptions) *Watcher {
 		bus:     make(chan []*Event, EVENT_BUS_MAX_SIZE),
 		stopCh:  make(chan struct{}),
 	}
-	go w.process()
+	util.Go(w.process)
 	return w
 }
diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go
index 6a5f38c3..c462d411 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -109,6 +109,7 @@ type KvStore struct {
 	asyncTaskSvc *async.AsyncTaskService
 	lock         sync.RWMutex
 	ready        chan struct{}
+	goroutine    *util.GoRoutine
 	isClose      bool
 }
 
@@ -116,6 +117,7 @@ func (s *KvStore) Initialize() {
 	s.indexers = make(map[StoreType]*Indexer)
 	s.asyncTaskSvc = async.NewAsyncTaskService()
 	s.ready = make(chan struct{})
+	s.goroutine = util.NewGo(context.Background())
 
 	for i := StoreType(0); i != typeEnd; i++ {
 		store.newNullStore(i)
@@ -147,7 +149,7 @@ func (s *KvStore) newIndexer(t StoreType, cacher Cacher) {
 }
 
 func (s *KvStore) Run() {
-	go s.store()
+	s.goroutine.Do(s.store)
 	s.asyncTaskSvc.Run()
 }
 
@@ -166,7 +168,7 @@ func (s *KvStore) SelfPreservationHandler() DeferHandler {
 	return &InstanceEventDeferHandler{Percent: DEFAULT_SELF_PRESERVATION_PERCENT}
 }
 
-func (s *KvStore) store() {
+func (s *KvStore) store(ctx context.Context) {
 	for t := StoreType(0); t != typeEnd; t++ {
 		switch t {
 		case INSTANCE:
@@ -178,7 +180,11 @@ func (s *KvStore) store() {
 		}
 	}
 	for _, i := range s.indexers {
-		<-i.Ready()
+		select {
+		case <-ctx.Done():
+			return
+		case <-i.Ready():
+		}
 	}
 	util.SafeCloseChan(s.ready)
 
@@ -214,9 +220,11 @@ func (s *KvStore) Stop() {
 
 	s.asyncTaskSvc.Stop()
 
+	s.goroutine.Close(true)
+
 	util.SafeCloseChan(s.ready)
 
-	util.Logger().Debugf("store daemon stopped.")
+	util.Logger().Debugf("store daemon stopped")
 }
 
 func (s *KvStore) Ready() <-chan struct{} {
diff --git a/server/infra/registry/registry.go b/server/infra/registry/registry.go
index 575a351f..0856fa3b 100644
--- a/server/infra/registry/registry.go
+++ b/server/infra/registry/registry.go
@@ -144,7 +144,7 @@ const (
 )
 
 const (
-	REQUEST_TIMEOUT = 300
+	REQUEST_TIMEOUT = 30 * time.Second
 
 	DEFAULT_PAGE_COUNT = 4096 // grpc does not allow to transport a large body more then 4MB in a request.
 )
@@ -359,7 +359,7 @@ func OpCmp(opt CompareOperation, result CompareResult, v interface{}) (cmp Compa
 }
 
 func WithTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
-	return context.WithTimeout(ctx, REQUEST_TIMEOUT*time.Second)
+	return context.WithTimeout(ctx, REQUEST_TIMEOUT)
 }
 
 func RegistryConfig() *Config {
diff --git a/server/plugin/infra/registry/embededetcd/embededetcd.go b/server/plugin/infra/registry/embededetcd/embededetcd.go
index 4b941bce..3bb71401 100644
--- a/server/plugin/infra/registry/embededetcd/embededetcd.go
+++ b/server/plugin/infra/registry/embededetcd/embededetcd.go
@@ -41,16 +41,17 @@ import (
 
 var embedTLSConfig *tls.Config
 
-const START_MANAGER_SERVER_TIMEOUT = 60
+const START_MANAGER_SERVER_TIMEOUT = 10
 
 func init() {
 	mgr.RegisterPlugin(mgr.Plugin{mgr.REGISTRY, "embeded_etcd", getEmbedInstance})
 }
 
 type EtcdEmbed struct {
-	Server *embed.Etcd
-	err    chan error
-	ready  chan int
+	Embed     *embed.Etcd
+	err       chan error
+	ready     chan int
+	goroutine *util.GoRoutine
 }
 
 func (s *EtcdEmbed) Err() <-chan error {
@@ -62,9 +63,10 @@ func (s *EtcdEmbed) Ready() <-chan int {
 }
 
 func (s *EtcdEmbed) Close() {
-	if s.Server != nil {
-		s.Server.Close()
+	if s.Embed != nil {
+		s.Embed.Close()
 	}
+	s.goroutine.Close(true)
 	util.Logger().Debugf("embedded etcd client stopped.")
 }
 
@@ -232,7 +234,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve int64) error {
 	}
 
 	util.Logger().Infof("Compacting... revision is %d(current: %d, reserve %d)", revToCompact, curRev, reserve)
-	_, err := s.Server.Server.Compact(ctx, &etcdserverpb.CompactionRequest{
+	_, err := s.Embed.Server.Compact(ctx, &etcdserverpb.CompactionRequest{
 		Revision: revToCompact,
 		Physical: true,
 	})
@@ -250,7 +252,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve int64) error {
 }
 
 func (s *EtcdEmbed) getLeaderCurrentRevision(ctx context.Context) int64 {
-	return s.Server.Server.KV().Rev()
+	return s.Embed.Server.KV().Rev()
 }
 
 func (s *EtcdEmbed) PutNoOverride(ctx context.Context, opts ...registry.PluginOpOption) (bool, error) {
@@ -275,7 +277,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r
 	switch op.Action {
 	case registry.Get:
 		var etcdResp *etcdserverpb.RangeResponse
-		etcdResp, err = s.Server.Server.Range(otCtx, s.toGetRequest(op))
+		etcdResp, err = s.Embed.Server.Range(otCtx, s.toGetRequest(op))
 		if err != nil {
 			break
 		}
@@ -286,7 +288,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r
 		}
 	case registry.Put:
 		var etcdResp *etcdserverpb.PutResponse
-		etcdResp, err = s.Server.Server.Put(otCtx, s.toPutRequest(op))
+		etcdResp, err = s.Embed.Server.Put(otCtx, s.toPutRequest(op))
 		if err != nil {
 			break
 		}
@@ -295,7 +297,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r
 		}
 	case registry.Delete:
 		var etcdResp *etcdserverpb.DeleteRangeResponse
-		etcdResp, err = s.Server.Server.DeleteRange(otCtx, s.toDeleteRequest(op))
+		etcdResp, err = s.Embed.Server.DeleteRange(otCtx, s.toDeleteRequest(op))
 		if err != nil {
 			break
 		}
@@ -338,7 +340,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success []registry.PluginOp,
 	if len(etcdFailOps) > 0 {
 		txnRequest.Failure = etcdFailOps
 	}
-	resp, err := s.Server.Server.Txn(otCtx, txnRequest)
+	resp, err := s.Embed.Server.Txn(otCtx, txnRequest)
 	if err != nil {
 		return nil, err
 	}
@@ -351,7 +353,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success []registry.PluginOp,
 func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL int64) (int64, error) {
 	otCtx, cancel := registry.WithTimeout(ctx)
 	defer cancel()
-	etcdResp, err := s.Server.Server.LeaseGrant(otCtx, &etcdserverpb.LeaseGrantRequest{
+	etcdResp, err := s.Embed.Server.LeaseGrant(otCtx, &etcdserverpb.LeaseGrantRequest{
 		TTL: TTL,
 	})
 	if err != nil {
@@ -363,7 +365,7 @@ func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL int64) (int64, error) {
 func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID int64) (int64, error) {
 	otCtx, cancel := registry.WithTimeout(ctx)
 	defer cancel()
-	ttl, err := s.Server.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID))
+	ttl, err := s.Embed.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID))
 	if err != nil {
 		if err.Error() == grpc.ErrorDesc(rpctypes.ErrGRPCLeaseNotFound) {
 			return 0, err
@@ -376,7 +378,7 @@ func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID int64) (int64, error
 func (s *EtcdEmbed) LeaseRevoke(ctx context.Context, leaseID int64) error {
 	otCtx, cancel := registry.WithTimeout(ctx)
 	defer cancel()
-	_, err := s.Server.Server.LeaseRevoke(otCtx, &etcdserverpb.LeaseRevokeRequest{
+	_, err := s.Embed.Server.LeaseRevoke(otCtx, &etcdserverpb.LeaseRevokeRequest{
 		ID: leaseID,
 	})
 	if err != nil {
@@ -392,7 +394,7 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption)
 	op := registry.OpGet(opts...)
 
 	if len(op.Key) > 0 {
-		watchable := s.Server.Server.Watchable()
+		watchable := s.Embed.Server.Watchable()
 		ws := watchable.NewWatchStream()
 		defer ws.Close()
 
@@ -455,6 +457,29 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption)
 	return
 }
 
+func (s *EtcdEmbed) ReadyNotify() {
+	timeout := START_MANAGER_SERVER_TIMEOUT * time.Second
+	select {
+	case <-s.Embed.Server.ReadyNotify():
+		close(s.ready)
+		s.goroutine.Do(func(ctx context.Context) {
+			select {
+			case <-ctx.Done():
+				return
+			case err := <-s.Embed.Err():
+				s.err <- err
+			}
+		})
+	case <-time.After(timeout):
+		err := fmt.Errorf("timed out(%s)", timeout)
+		util.Logger().Errorf(err, "read notify failed")
+
+		s.Embed.Server.Stop()
+
+		s.err <- err
+	}
+}
+
 func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *mvccpb.Event) registry.ActionType {
 	switch evt.Type {
 	case mvccpb.DELETE:
@@ -488,8 +513,9 @@ func getEmbedInstance() mgr.PluginInstance {
 	addrs := beego.AppConfig.DefaultString("manager_addr", "http://127.0.0.1:2380")
 
 	inst := &EtcdEmbed{
-		err:   make(chan error, 1),
-		ready: make(chan int),
+		err:       make(chan error, 1),
+		ready:     make(chan int),
+		goroutine: util.NewGo(context.Background()),
 	}
 
 	if core.ServerInfo.Config.SslEnabled {
@@ -537,30 +563,14 @@ func getEmbedInstance() mgr.PluginInstance {
 		inst.err <- err
 		return inst
 	}
-	inst.Server = etcd
-
-	select {
-	case <-etcd.Server.ReadyNotify():
-		close(inst.ready)
-		go func() {
-			select {
-			case err := <-etcd.Err():
-				inst.err <- err
-			}
-		}()
-	case <-time.After(START_MANAGER_SERVER_TIMEOUT * time.Second):
-		message := "etcd server took too long to start"
-		util.Logger().Error(message, nil)
+	inst.Embed = etcd
 
-		etcd.Server.Stop()
-
-		inst.err <- errors.New(message)
-	}
+	inst.ReadyNotify()
 	return inst
 }
 
 func parseURL(addrs string) ([]url.URL, error) {
-	urls := []url.URL{}
+	var urls []url.URL
 	ips := strings.Split(addrs, ",")
 	for _, ip := range ips {
 		addr, err := url.Parse(ip)
diff --git a/server/plugin/infra/tracing/buildin/file_collector.go b/server/plugin/infra/tracing/buildin/file_collector.go
index bd48e5bd..851b3fc7 100644
--- a/server/plugin/infra/tracing/buildin/file_collector.go
+++ b/server/plugin/infra/tracing/buildin/file_collector.go
@@ -23,15 +23,19 @@ import (
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	"github.com/apache/incubator-servicecomb-service-center/server/core"
 	"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+	"golang.org/x/net/context"
 	"os"
+	"strings"
 	"time"
 )
 
 type FileCollector struct {
 	Fd        *os.File
+	Timeout   time.Duration
 	Interval  time.Duration
 	BatchSize int
 	c         chan *zipkincore.Span
+	goroutine *util.GoRoutine
 }
 
 func (f *FileCollector) Collect(span *zipkincore.Span) error {
@@ -39,11 +43,16 @@ func (f *FileCollector) Collect(span *zipkincore.Span) error {
 		return fmt.Errorf("required FD to write")
 	}
 
-	f.c <- span
+	select {
+	case f.c <- span:
+	case <-time.After(f.Timeout):
+		util.Logger().Errorf(nil, "send span to handle channel timed out(%s)", f.Timeout)
+	}
 	return nil
 }
 
 func (f *FileCollector) Close() error {
+	f.goroutine.Close(true)
 	return f.Fd.Close()
 }
 
@@ -77,7 +86,7 @@ func (f *FileCollector) write(batch []*zipkincore.Span) (c int) {
 }
 
 func (f *FileCollector) checkFile() error {
-	if util.PathExist(f.Fd.Name()) {
+	if util.PathExist(f.Fd.Name()) || strings.Index(f.Fd.Name(), "/dev/") == 0 {
 		return nil
 	}
 
@@ -100,52 +109,54 @@ func (f *FileCollector) checkFile() error {
 	return nil
 }
 
-func (f *FileCollector) Run(stopCh <-chan struct{}) {
-	var (
-		batch []*zipkincore.Span
-		prev  []*zipkincore.Span
-		i     = f.Interval * 10
-		t     = time.NewTicker(f.Interval)
-		nr    = time.Now().Add(i)
-		max   = f.BatchSize * 2
-	)
-	for {
-		select {
-		case <-stopCh:
-			f.write(batch)
-			return
-		case span := <-f.c:
-			batch = append(batch, span)
-			if len(batch) >= f.BatchSize {
-				if len(batch) > max {
-					dispose := len(batch) - f.BatchSize
-					util.Logger().Errorf(nil, "backlog is full, dispose %d span(s), max: %d",
-						dispose, max)
-					batch = batch[dispose:] // allocate more
-				}
-				if c := f.write(batch); c == 0 {
-					continue
+func (f *FileCollector) Run() {
+	f.goroutine.Do(func(ctx context.Context) {
+		var (
+			batch []*zipkincore.Span
+			prev  []*zipkincore.Span
+			i     = f.Interval * 10
+			t     = time.NewTicker(f.Interval)
+			nr    = time.Now().Add(i)
+			max   = f.BatchSize * 2
+		)
+		for {
+			select {
+			case <-ctx.Done():
+				f.write(batch)
+				return
+			case span := <-f.c:
+				batch = append(batch, span)
+				if len(batch) >= f.BatchSize {
+					if len(batch) > max {
+						dispose := len(batch) - f.BatchSize
+						util.Logger().Errorf(nil, "backlog is full, dispose %d span(s), max: %d",
+							dispose, max)
+						batch = batch[dispose:] // allocate more
+					}
+					if c := f.write(batch); c == 0 {
+						continue
+					}
+					if prev != nil {
+						batch, prev = prev[:0], batch
+					} else {
+						prev, batch = batch, batch[len(batch):] // new one
+					}
 				}
-				if prev != nil {
-					batch, prev = prev[:0], batch
-				} else {
-					prev, batch = batch, batch[len(batch):] // new one
+			case <-t.C:
+				if time.Now().After(nr) {
+					util.LogRotateFile(f.Fd.Name(),
+						int(core.ServerInfo.Config.LogRotateSize),
+						int(core.ServerInfo.Config.LogBackupCount),
+					)
+					nr = time.Now().Add(i)
 				}
-			}
-		case <-t.C:
-			if time.Now().After(nr) {
-				util.LogRotateFile(f.Fd.Name(),
-					int(core.ServerInfo.Config.LogRotateSize),
-					int(core.ServerInfo.Config.LogBackupCount),
-				)
-				nr = time.Now().Add(i)
-			}
 
-			if c := f.write(batch); c > 0 {
-				batch = batch[:0]
+				if c := f.write(batch); c > 0 {
+					batch = batch[:0]
+				}
 			}
 		}
-	}
+	})
 }
 
 func NewFileCollector(path string) (*FileCollector, error) {
@@ -155,10 +166,12 @@ func NewFileCollector(path string) (*FileCollector, error) {
 	}
 	fc := &FileCollector{
 		Fd:        fd,
+		Timeout:   5 * time.Second,
 		Interval:  10 * time.Second,
 		BatchSize: 100,
 		c:         make(chan *zipkincore.Span, 1000),
+		goroutine: util.NewGo(context.Background()),
 	}
-	util.Go(fc.Run)
+	fc.Run()
 	return fc, nil
 }
diff --git a/server/plugin/infra/tracing/buildin/file_collector_test.go b/server/plugin/infra/tracing/buildin/file_collector_test.go
index 075fdaec..4f3e1505 100644
--- a/server/plugin/infra/tracing/buildin/file_collector_test.go
+++ b/server/plugin/infra/tracing/buildin/file_collector_test.go
@@ -17,34 +17,33 @@
 package buildin
 
 import (
+	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+	"golang.org/x/net/context"
 	"os"
 	"testing"
 	"time"
 )
 
 func TestFileCollector_Collect(t *testing.T) {
-	fileName := "./test"
-	fd, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
-	if err != nil {
-		t.FailNow()
-	}
 	fc := &FileCollector{
-		Fd:        fd,
+		Fd:        os.Stdout,
+		Timeout:   1 * time.Second,
 		Interval:  100 * time.Second,
 		BatchSize: 2,
-		c:         make(chan *zipkincore.Span, 1000),
+		c:         make(chan *zipkincore.Span, 100),
+		goroutine: util.NewGo(context.Background()),
 	}
 	defer func() {
 		fc.Close()
-		os.Remove(fileName)
 	}()
-	util.Go(fc.Run)
+	fc.Run()
 
-	for i := int64(0); i < 3; i++ {
-		err := fc.Collect(&zipkincore.Span{ParentID: &i, TraceIDHigh: &i})
+	for i := 0; i < 10; i++ {
+		err := fc.Collect(&zipkincore.Span{})
 		if err != nil {
+			fmt.Println(err)
 			t.FailNow()
 		}
 	}
diff --git a/server/plugin/infra/tracing/buildin/span.go b/server/plugin/infra/tracing/buildin/span.go
index 5b8011d9..3d200887 100644
--- a/server/plugin/infra/tracing/buildin/span.go
+++ b/server/plugin/infra/tracing/buildin/span.go
@@ -61,7 +61,9 @@ type Endpoint struct {
 func (s *Span) FromZipkinSpan(span *zipkincore.Span) {
 	traceId := new(types.TraceID)
 	traceId.Low = uint64(span.TraceID)
-	traceId.High = uint64(*(span.TraceIDHigh))
+	if span.TraceIDHigh != nil {
+		traceId.High = uint64(*(span.TraceIDHigh))
+	}
 	s.TraceID = traceId.ToHex()
 	s.Duration = span.Duration
 
diff --git a/server/plugin/infra/tracing/buildin/span_test.go b/server/plugin/infra/tracing/buildin/span_test.go
index c3dceb49..069e6a9a 100644
--- a/server/plugin/infra/tracing/buildin/span_test.go
+++ b/server/plugin/infra/tracing/buildin/span_test.go
@@ -158,4 +158,12 @@ func TestFromZipkinSpan(t *testing.T) {
 		t.FailNow()
 	}
 	fmt.Println(string(b))
+
+	s = FromZipkinSpan(&zipkincore.Span{})
+	b, err = json.Marshal(s)
+	if err != nil {
+		fmt.Println("TestFromZipkinSpan Marshal", err)
+		t.FailNow()
+	}
+	fmt.Println(string(b))
 }
diff --git a/server/server.go b/server/server.go
index a5468798..15e0abd3 100644
--- a/server/server.go
+++ b/server/server.go
@@ -43,6 +43,7 @@ func init() {
 		store:         st.Store(),
 		notifyService: nf.GetNotifyService(),
 		apiServer:     GetAPIServer(),
+		goroutine:     util.NewGo(context.Background()),
 	}
 }
 
@@ -50,6 +51,7 @@ type ServiceCenterServer struct {
 	apiServer     *APIServer
 	notifyService *nf.NotifyService
 	store         *st.KvStore
+	goroutine     *util.GoRoutine
 }
 
 func (s *ServiceCenterServer) Run() {
@@ -74,7 +76,7 @@ func (s *ServiceCenterServer) waitForQuit() {
 
 	s.Stop()
 
-	util.Logger().Warn("service center quit", nil)
+	util.Logger().Debugf("service center stopped")
 }
 
 func (s *ServiceCenterServer) needUpgrade() bool {
@@ -119,12 +121,12 @@ func (s *ServiceCenterServer) autoCompactBackend() {
 		util.Logger().Errorf(err, "invalid compact interval %s, reset to default interval 12h", core.ServerInfo.Config.CompactInterval)
 		interval = 12 * time.Hour
 	}
-	util.Go(func(stopCh <-chan struct{}) {
+	s.goroutine.Do(func(ctx context.Context) {
 		util.Logger().Infof("enabled the automatic compact mechanism, compact once every %s, reserve %d",
 			core.ServerInfo.Config.CompactInterval, delta)
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-time.After(interval):
 				lock, err := mux.Try(mux.GLOBAL_LOCK)
@@ -133,7 +135,7 @@ func (s *ServiceCenterServer) autoCompactBackend() {
 					continue
 				}
 
-				backend.Registry().Compact(context.Background(), delta)
+				backend.Registry().Compact(ctx, delta)
 
 				lock.Unlock()
 			}
@@ -190,9 +192,7 @@ func (s *ServiceCenterServer) Stop() {
 		s.store.Stop()
 	}
 
-	util.GoCloseAndWait()
-
-	backend.Registry().Close()
+	s.goroutine.Close(true)
 }
 
 func Run() {
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index 7bbe5df8..6961c816 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -50,7 +50,7 @@ func (h *DependencyEventHandler) OnEvent(evt *store.KvEvent) {
 }
 
 func (h *DependencyEventHandler) loop() {
-	util.Go(func(stopCh <-chan struct{}) {
+	util.Go(func(ctx context.Context) {
 		waitDelayIndex := 0
 		waitDelay := []int{1, 1, 5, 10, 20, 30, 60}
 		retry := func() {
@@ -64,7 +64,7 @@ func (h *DependencyEventHandler) loop() {
 		}
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-h.signals.Chan():
 				lock, err := mux.Try(mux.DEP_QUEUE_LOCK)
diff --git a/server/service/instances.go b/server/service/instances.go
index 48c70296..eb945069 100644
--- a/server/service/instances.go
+++ b/server/service/instances.go
@@ -370,19 +370,7 @@ func (s *InstanceService) HeartbeatSet(ctx context.Context, in *pb.HeartbeatSetR
 			existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true
 			noMultiCounter++
 		}
-		go func(element *pb.HeartbeatSetElement) {
-			hbRst := &pb.InstanceHbRst{
-				ServiceId:  element.ServiceId,
-				InstanceId: element.InstanceId,
-				ErrMessage: "",
-			}
-			_, _, err, _ := serviceUtil.HeartbeatUtil(ctx, domainProject, element.ServiceId, element.InstanceId)
-			if err != nil {
-				hbRst.ErrMessage = err.Error()
-				util.Logger().Errorf(err, "heartbeat set failed, %s/%s", element.ServiceId, element.InstanceId)
-			}
-			instancesHbRst <- hbRst
-		}(heartbeatElement)
+		util.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst, heartbeatElement))
 	}
 	count := 0
 	successFlag := false
@@ -415,6 +403,22 @@ func (s *InstanceService) HeartbeatSet(ctx context.Context, in *pb.HeartbeatSetR
 	}
 }
 
+func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement) func(context.Context) {
+	return func(_ context.Context) {
+		hbRst := &pb.InstanceHbRst{
+			ServiceId:  element.ServiceId,
+			InstanceId: element.InstanceId,
+			ErrMessage: "",
+		}
+		_, _, err, _ := serviceUtil.HeartbeatUtil(ctx, domainProject, element.ServiceId, element.InstanceId)
+		if err != nil {
+			hbRst.ErrMessage = err.Error()
+			util.Logger().Errorf(err, "heartbeat set failed, %s/%s", element.ServiceId, element.InstanceId)
+		}
+		instancesHbRst <- hbRst
+	}
+}
+
 func (s *InstanceService) GetOneInstance(ctx context.Context, in *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) {
 	checkErr := s.getInstancePreCheck(ctx, in)
 	if checkErr != nil {
@@ -723,7 +727,6 @@ func (s *InstanceService) UpdateInstanceProperties(ctx context.Context, in *pb.U
 	}, nil
 }
 
-
 func (s *InstanceService) WatchPreOpera(ctx context.Context, in *pb.WatchInstanceRequest) error {
 	if in == nil || len(in.SelfServiceId) == 0 {
 		return errors.New("Request format invalid.")
@@ -742,7 +745,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream pb.ServiceIn
 		return err
 	}
 	domainProject := util.ParseDomainProject(stream.Context())
-	watcher := nf.NewInstanceWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/")
+	watcher := nf.NewInstanceListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil)
 	err = nf.GetNotifyService().AddSubscriber(watcher)
 	util.Logger().Infof("start watch instance status, watcher %s %s", watcher.Subject(), watcher.Id())
 	return nf.HandleWatchJob(watcher, stream, nf.GetNotifyService().Config.NotifyTimeout)
@@ -754,7 +757,7 @@ func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstan
 		nf.EstablishWebSocketError(conn, err)
 		return
 	}
-	nf.DoWebSocketWatch(ctx, in.SelfServiceId, conn)
+	nf.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn)
 }
 
 func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 4fe47d29..6663ab99 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -359,11 +359,6 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De
 			nuoMultilCount++
 		}
 
-		serviceRst := &pb.DelServicesRspInfo{
-			ServiceId:  serviceId,
-			ErrMessage: "",
-		}
-
 		//检查服务ID合法性
 		in := &pb.DeleteServiceRequest{
 			ServiceId: serviceId,
@@ -372,22 +367,15 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De
 		err := apt.Validate(in)
 		if err != nil {
 			util.Logger().Errorf(err, "delete micro-service failed, serviceId is %s: invalid parameters.", in.ServiceId)
-			serviceRst.ErrMessage = err.Error()
-			serviceRespChan <- serviceRst
+			serviceRespChan <- &pb.DelServicesRspInfo{
+				ServiceId:  serviceId,
+				ErrMessage: err.Error(),
+			}
 			continue
 		}
 
 		//执行删除服务操作
-		go func(serviceItem string) {
-			resp, err := s.DeleteServicePri(ctx, serviceItem, request.Force)
-			if err != nil {
-				serviceRst.ErrMessage = err.Error()
-			} else if resp.Code != pb.Response_SUCCESS {
-				serviceRst.ErrMessage = resp.Message
-			}
-
-			serviceRespChan <- serviceRst
-		}(serviceId)
+		util.Go(s.getDeleteServiceFunc(ctx, serviceId, request.Force, serviceRespChan))
 	}
 
 	//获取批量删除服务的结果
@@ -419,6 +407,23 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De
 	return resp, nil
 }
 
+func (s *MicroServiceService) getDeleteServiceFunc(ctx context.Context, serviceId string, force bool, serviceRespChan chan<- *pb.DelServicesRspInfo) func(context.Context) {
+	return func(_ context.Context) {
+		serviceRst := &pb.DelServicesRspInfo{
+			ServiceId:  serviceId,
+			ErrMessage: "",
+		}
+		resp, err := s.DeleteServicePri(ctx, serviceId, force)
+		if err != nil {
+			serviceRst.ErrMessage = err.Error()
+		} else if resp.Code != pb.Response_SUCCESS {
+			serviceRst.ErrMessage = resp.Message
+		}
+
+		serviceRespChan <- serviceRst
+	}
+}
+
 func (s *MicroServiceService) GetOne(ctx context.Context, in *pb.GetServiceRequest) (*pb.GetServiceResponse, error) {
 	if in == nil || len(in.ServiceId) == 0 {
 		return &pb.GetServiceResponse{
@@ -437,7 +442,7 @@ func (s *MicroServiceService) GetOne(ctx context.Context, in *pb.GetServiceReque
 	service, err := serviceUtil.GetService(ctx, domainProject, in.ServiceId)
 
 	if err != nil {
-		util.Logger().Errorf(err, "get micro-service failed, serviceId is %s: inner err,get service failed.", in.ServiceId)
+		util.Logger().Errorf(err, "get micro-service failed, serviceId is %s: inner err, get service failed.", in.ServiceId)
 		return &pb.GetServiceResponse{
 			Response: pb.CreateResponse(scerr.ErrInternal, "Get service file failed."),
 		}, err
@@ -655,7 +660,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
 	//create rules
 	if in.Rules != nil && len(in.Rules) != 0 {
 		chanLen++
-		go func() {
+		util.Go(func(_ context.Context) {
 			req := &pb.AddServiceRulesRequest{
 				ServiceId: serviceId,
 				Rules:     in.Rules,
@@ -670,12 +675,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
 				chanRsp.Message = rsp.Response.Message
 			}
 			createRespChan <- chanRsp
-		}()
+		})
 	}
 	//create tags
 	if in.Tags != nil && len(in.Tags) != 0 {
 		chanLen++
-		go func() {
+		util.Go(func(_ context.Context) {
 			req := &pb.AddServiceTagsRequest{
 				ServiceId: serviceId,
 				Tags:      in.Tags,
@@ -690,12 +695,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
 				chanRsp.Message = rsp.Response.Message
 			}
 			createRespChan <- chanRsp
-		}()
+		})
 	}
 	// create instance
 	if in.Instances != nil && len(in.Instances) != 0 {
 		chanLen++
-		go func() {
+		util.Go(func(_ context.Context) {
 			chanRsp := &pb.Response{}
 			for _, ins := range in.Instances {
 				req := &pb.RegisterInstanceRequest{
@@ -711,7 +716,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
 				}
 				createRespChan <- chanRsp
 			}
-		}()
+		})
 	}
 
 	// handle result
diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go
index 8a340760..de0e2c66 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -19,6 +19,7 @@ package notification
 import (
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+	"golang.org/x/net/context"
 	"time"
 )
 
@@ -44,10 +45,10 @@ func (w *ListWatcher) OnAccept() {
 	}
 
 	util.Logger().Debugf("accepted by notify service, %s watcher %s %s", w.Type(), w.Id(), w.Subject())
-	go w.listAndPublishJobs()
+	util.Go(w.listAndPublishJobs)
 }
 
-func (w *ListWatcher) listAndPublishJobs() {
+func (w *ListWatcher) listAndPublishJobs(_ context.Context) {
 	defer close(w.listCh)
 	if w.ListFunc == nil {
 		return
@@ -112,10 +113,6 @@ func NewWatchJob(nType NotifyType, subscriberId, subject string, rev int64, resp
 	}
 }
 
-func NewWatcher(nType NotifyType, id string, subject string) *ListWatcher {
-	return NewListWatcher(nType, id, subject, nil)
-}
-
 func NewListWatcher(nType NotifyType, id string, subject string,
 	listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
 	watcher := &ListWatcher{
diff --git a/server/service/notification/notification_service.go b/server/service/notification/notification_service.go
index e2d78640..63f78a27 100644
--- a/server/service/notification/notification_service.go
+++ b/server/service/notification/notification_service.go
@@ -20,6 +20,7 @@ import (
 	"container/list"
 	"errors"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	"golang.org/x/net/context"
 	"sync"
 	"time"
 )
@@ -33,7 +34,8 @@ var notifyService *NotifyService
 
 func init() {
 	notifyService = &NotifyService{
-		isClose: true,
+		isClose:   true,
+		goroutine: util.NewGo(context.Background()),
 	}
 }
 
@@ -46,13 +48,14 @@ type serviceIndex map[NotifyType]subscriberSubjectIndex
 type NotifyService struct {
 	Config NotifyServiceConfig
 
-	services serviceIndex
-	queues   map[NotifyType]chan NotifyJob
-	waits    sync.WaitGroup
-	mutexes  map[NotifyType]*sync.Mutex
-	err      chan error
-	closeMux sync.RWMutex
-	isClose  bool
+	services  serviceIndex
+	queues    map[NotifyType]chan NotifyJob
+	waits     sync.WaitGroup
+	mutexes   map[NotifyType]*sync.Mutex
+	err       chan error
+	closeMux  sync.RWMutex
+	isClose   bool
+	goroutine *util.GoRoutine
 }
 
 func (s *NotifyService) Err() <-chan error {
@@ -150,41 +153,52 @@ func (s *NotifyService) AddJob(job NotifyJob) error {
 	}
 }
 
-func (s *NotifyService) publish2Subscriber(t NotifyType) {
-	defer s.waits.Done()
-	for job := range s.queues[t] {
-		util.Logger().Infof("notification service got a job %s: %s to notify subscriber %s",
-			job.Type(), job.Subject(), job.SubscriberId())
+func (s *NotifyService) getPublish2SubscriberFunc(t NotifyType) func(context.Context) {
+	return func(ctx context.Context) {
+		defer s.waits.Done()
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case job, ok := <-s.queues[t]:
+				if !ok {
+					return
+				}
 
-		s.mutexes[t].Lock()
+				util.Logger().Infof("notification service got a job %s: %s to notify subscriber %s",
+					job.Type(), job.Subject(), job.SubscriberId())
 
-		if s.Closed() && len(s.services[t]) == 0 {
-			s.mutexes[t].Unlock()
-			return
-		}
+				s.mutexes[t].Lock()
 
-		m, ok := s.services[t][job.Subject()]
-		if ok {
-			// publish的subject如果带上id,则单播,否则广播
-			if len(job.SubscriberId()) != 0 {
-				ns, ok := m[job.SubscriberId()]
+				if s.Closed() && len(s.services[t]) == 0 {
+					s.mutexes[t].Unlock()
+					return
+				}
+
+				m, ok := s.services[t][job.Subject()]
 				if ok {
-					for n := ns.Front(); n != nil; n = n.Next() {
-						n.Value.(Subscriber).OnMessage(job)
+					// publish的subject如果带上id,则单播,否则广播
+					if len(job.SubscriberId()) != 0 {
+						ns, ok := m[job.SubscriberId()]
+						if ok {
+							for n := ns.Front(); n != nil; n = n.Next() {
+								n.Value.(Subscriber).OnMessage(job)
+							}
+						}
+						s.mutexes[t].Unlock()
+						continue
+					}
+					for key := range m {
+						ns := m[key]
+						for n := ns.Front(); n != nil; n = n.Next() {
+							n.Value.(Subscriber).OnMessage(job)
+						}
 					}
 				}
+
 				s.mutexes[t].Unlock()
-				continue
-			}
-			for key := range m {
-				ns := m[key]
-				for n := ns.Front(); n != nil; n = n.Next() {
-					n.Value.(Subscriber).OnMessage(job)
-				}
 			}
 		}
-
-		s.mutexes[t].Unlock()
 	}
 }
 
@@ -227,7 +241,7 @@ func (s *NotifyService) Start() {
 	util.Logger().Debugf("notify service is started with config %s", s.Config)
 
 	for i := NotifyType(0); i != typeEnd; i++ {
-		go s.publish2Subscriber(i)
+		s.goroutine.Do(s.getPublish2SubscriberFunc(i))
 	}
 }
 
@@ -255,6 +269,8 @@ func (s *NotifyService) Stop() {
 
 	close(s.err)
 
+	s.goroutine.Close(true)
+
 	util.Logger().Debug("notify service stopped.")
 }
 
diff --git a/server/service/notification/watch_util.go b/server/service/notification/watch_util.go
index c938719c..9a7e26c5 100644
--- a/server/service/notification/watch_util.go
+++ b/server/service/notification/watch_util.go
@@ -62,6 +62,7 @@ type WebSocketHandler struct {
 	watcher         *ListWatcher
 	needPingWatcher bool
 	closed          chan struct{}
+	goroutine       *util.GoRoutine
 }
 
 func (wh *WebSocketHandler) Init() error {
@@ -101,7 +102,7 @@ func (wh *WebSocketHandler) websocketHeartbeat(messageType int) error {
 	return nil
 }
 
-func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage() {
+func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage(ctx context.Context) {
 	defer close(wh.closed)
 
 	remoteAddr := wh.conn.RemoteAddr().String()
@@ -128,17 +129,23 @@ func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage() {
 	})
 
 	for {
-		_, _, err := wh.conn.ReadMessage()
-		if err != nil {
-			wh.watcher.SetError(err)
+		select {
+		case <-ctx.Done():
 			return
+		default:
+			_, _, err := wh.conn.ReadMessage()
+			if err != nil {
+				wh.watcher.SetError(err)
+				return
+			}
 		}
 	}
 }
 
 func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
-	remoteAddr := wh.conn.RemoteAddr().String()
+	wh.goroutine.Do(wh.HandleWatchWebSocketControlMessage)
 
+	remoteAddr := wh.conn.RemoteAddr().String()
 	for {
 		select {
 		case <-wh.closed:
@@ -224,8 +231,10 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
 }
 
 func (wh *WebSocketHandler) Close(code int, text string) error {
+	defer wh.goroutine.Close(true)
+
 	remoteAddr := wh.conn.RemoteAddr().String()
-	message := []byte{}
+	var message []byte
 	if code != websocket.CloseNoStatusReceived {
 		message = websocket.FormatCloseMessage(code, text)
 	}
@@ -238,18 +247,6 @@ func (wh *WebSocketHandler) Close(code int, text string) error {
 	return nil
 }
 
-func DoWebSocketWatch(ctx context.Context, serviceId string, conn *websocket.Conn) {
-	domainProject := util.ParseDomainProject(ctx)
-	handler := &WebSocketHandler{
-		ctx:             ctx,
-		conn:            conn,
-		watcher:         NewInstanceWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/"),
-		needPingWatcher: true,
-		closed:          make(chan struct{}),
-	}
-	processHandler(handler)
-}
-
 func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
 	domainProject := util.ParseDomainProject(ctx)
 	handler := &WebSocketHandler{
@@ -258,6 +255,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]
 		watcher:         NewInstanceListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f),
 		needPingWatcher: true,
 		closed:          make(chan struct{}),
+		goroutine:       util.NewGo(context.Background()),
 	}
 	processHandler(handler)
 }
@@ -266,7 +264,6 @@ func processHandler(handler *WebSocketHandler) {
 	if err := handler.Init(); err != nil {
 		return
 	}
-	go handler.HandleWatchWebSocketControlMessage()
 	handler.HandleWatchWebSocketJob()
 }
 
@@ -294,10 +291,6 @@ func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey
 	}
 }
 
-func NewInstanceWatcher(selfServiceId, instanceRoot string) *ListWatcher {
-	return NewWatcher(INSTANCE, selfServiceId, instanceRoot)
-}
-
 func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
 	return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc)
 }
diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go
index 66261163..f0837ee4 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -557,11 +557,10 @@ type Dependency struct {
 
 func (dep *Dependency) RemoveConsumerOfProviderRule() {
 	dep.chanNum++
-	go dep.removeConsumerOfProviderRule()
+	util.Go(dep.removeConsumerOfProviderRule)
 }
 
-func (dep *Dependency) removeConsumerOfProviderRule() {
-	ctx := context.TODO()
+func (dep *Dependency) removeConsumerOfProviderRule(ctx context.Context) {
 	opts := make([]registry.PluginOp, 0, len(dep.removedDependencyRuleList))
 	for _, providerRule := range dep.removedDependencyRuleList {
 		proProkey := apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule)
@@ -605,11 +604,10 @@ func (dep *Dependency) removeConsumerOfProviderRule() {
 
 func (dep *Dependency) AddConsumerOfProviderRule() {
 	dep.chanNum++
-	go dep.addConsumerOfProviderRule()
+	util.Go(dep.addConsumerOfProviderRule)
 }
 
-func (dep *Dependency) addConsumerOfProviderRule() {
-	ctx := context.TODO()
+func (dep *Dependency) addConsumerOfProviderRule(ctx context.Context) {
 	opts := []registry.PluginOp{}
 	for _, providerRule := range dep.NewDependencyRuleList {
 		proProkey := apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message