pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: [go-client] integrated `logrus`, `testify` and `go mod` for pulsar-client-go (#3443)
Date Tue, 29 Jan 2019 04:42:56 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b74640  [go-client] integrated `logrus`, `testify` and `go mod` for pulsar-client-go
(#3443)
8b74640 is described below

commit 8b746402a94f0a57f53eedbb174fcaff33edca1d
Author: 冉小龙 <rxl5555555@qq.com>
AuthorDate: Tue Jan 29 12:42:51 2019 +0800

    [go-client] integrated `logrus`, `testify` and `go mod` for pulsar-client-go (#3443)
    
    * [go-clint]add producer.GetLastSequenceID in Go client
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
    
    * fix comments and use LastSequenceID instead of GetLastSequenceID
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
    
    * fix consumer and producer test error
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
    
    * [pulsar-client-go]provide setSequenceID interface
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
    
    * add test case for setSequenceID
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
    
    * rename ID -> SequenceID
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
    
    * support issue #3417
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
    
    * fix some issues about log and go mod
    
    Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
---
 pulsar-client-go/.gitignore                        |  11 +
 .../consumer-listener/consumer-listener.go         |   2 +-
 pulsar-client-go/examples/consumer/consumer.go     |   2 +-
 pulsar-client-go/examples/producer/producer.go     |   3 +-
 pulsar-client-go/examples/reader/reader.go         |   2 +-
 pulsar-client-go/go.mod                            |   9 +
 pulsar-client-go/go.sum                            |  26 +++
 pulsar-client-go/logutil/log.go                    | 222 +++++++++++++++++++++
 pulsar-client-go/pulsar/c_client.go                |  13 +-
 pulsar-client-go/pulsar/client.go                  |   8 +-
 pulsar-client-go/pulsar/client_test.go             |  31 +--
 pulsar-client-go/pulsar/consumer_test.go           | 103 +++++-----
 pulsar-client-go/pulsar/logger.go                  |  47 -----
 pulsar-client-go/pulsar/producer_test.go           |  53 ++---
 pulsar-client-go/pulsar/reader_test.go             |  71 ++++---
 pulsar-client-go/pulsar/util_test.go               |  29 +--
 16 files changed, 419 insertions(+), 213 deletions(-)

diff --git a/pulsar-client-go/.gitignore b/pulsar-client-go/.gitignore
new file mode 100644
index 0000000..6b6998a
--- /dev/null
+++ b/pulsar-client-go/.gitignore
@@ -0,0 +1,11 @@
+# IDE generated files
+.idea/
+
+# dep generated files
+vendor
+
+# Mac swap file
+*.DS_Store
+
+# Linux swap file
+*.swp
diff --git a/pulsar-client-go/examples/consumer-listener/consumer-listener.go b/pulsar-client-go/examples/consumer-listener/consumer-listener.go
index 9320951..0adc82e 100644
--- a/pulsar-client-go/examples/consumer-listener/consumer-listener.go
+++ b/pulsar-client-go/examples/consumer-listener/consumer-listener.go
@@ -21,8 +21,8 @@ package main
 
 import (
 	"fmt"
+	log "github.com/apache/pulsar/pulsar-client-go/logutil"
 	"github.com/apache/pulsar/pulsar-client-go/pulsar"
-	"log"
 )
 
 func main() {
diff --git a/pulsar-client-go/examples/consumer/consumer.go b/pulsar-client-go/examples/consumer/consumer.go
index e8b4f5b..5b5d7c1 100644
--- a/pulsar-client-go/examples/consumer/consumer.go
+++ b/pulsar-client-go/examples/consumer/consumer.go
@@ -22,8 +22,8 @@ package main
 import (
 	"context"
 	"fmt"
+	log "github.com/apache/pulsar/pulsar-client-go/logutil"
 	"github.com/apache/pulsar/pulsar-client-go/pulsar"
-	"log"
 )
 
 func main() {
diff --git a/pulsar-client-go/examples/producer/producer.go b/pulsar-client-go/examples/producer/producer.go
index 0d8e07e..58e1ffa 100644
--- a/pulsar-client-go/examples/producer/producer.go
+++ b/pulsar-client-go/examples/producer/producer.go
@@ -22,8 +22,9 @@ package main
 import (
 	"context"
 	"fmt"
+
+	log "github.com/apache/pulsar/pulsar-client-go/logutil"
 	"github.com/apache/pulsar/pulsar-client-go/pulsar"
-	"log"
 )
 
 func main() {
diff --git a/pulsar-client-go/examples/reader/reader.go b/pulsar-client-go/examples/reader/reader.go
index d1ccaa7..e6a67e6 100644
--- a/pulsar-client-go/examples/reader/reader.go
+++ b/pulsar-client-go/examples/reader/reader.go
@@ -22,8 +22,8 @@ package main
 import (
 	"context"
 	"fmt"
+	log "github.com/apache/pulsar/pulsar-client-go/logutil"
 	"github.com/apache/pulsar/pulsar-client-go/pulsar"
-	"log"
 )
 
 func main() {
diff --git a/pulsar-client-go/go.mod b/pulsar-client-go/go.mod
new file mode 100644
index 0000000..1d826eb
--- /dev/null
+++ b/pulsar-client-go/go.mod
@@ -0,0 +1,9 @@
+module github.com/apache/pulsar/pulsar-client-go
+
+require (
+	github.com/BurntSushi/toml v0.3.1 // indirect
+	github.com/sirupsen/logrus v1.3.0
+	github.com/stretchr/testify v1.3.0
+	gopkg.in/natefinch/lumberjack.v2 v2.0.0
+	gopkg.in/yaml.v2 v2.2.2 // indirect
+)
diff --git a/pulsar-client-go/go.sum b/pulsar-client-go/go.sum
new file mode 100644
index 0000000..5f0cd6b
--- /dev/null
+++ b/pulsar-client-go/go.sum
@@ -0,0 +1,26 @@
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
+github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pulsar-client-go/logutil/log.go b/pulsar-client-go/logutil/log.go
new file mode 100644
index 0000000..3ee4a78
--- /dev/null
+++ b/pulsar-client-go/logutil/log.go
@@ -0,0 +1,222 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package logutil
+
+import (
+	"bytes"
+	"fmt"
+	"path"
+	"runtime"
+	"sort"
+	"strings"
+
+	log "github.com/sirupsen/logrus"
+	"gopkg.in/natefinch/lumberjack.v2"
+)
+
+type LoggerLevel int
+
+const (
+	DEBUG LoggerLevel = iota
+	INFO
+	WARN
+	ERROR
+)
+
+func (l LoggerLevel) String() string {
+	switch l {
+	case DEBUG:
+		return "DEBUG"
+	case INFO:
+		return "INFO"
+	case WARN:
+		return "WARN"
+	case ERROR:
+		return "ERROR"
+
+	default:
+		return fmt.Sprintf("UNKNOWN: %d", l)
+	}
+}
+
+const (
+	defaultLogLevel      = log.InfoLevel
+	defaultLogTimeFormat = "2006/01/02 15:04:05.000"
+)
+
+func Info(v ...interface{}) {
+	log.Info(v...)
+}
+
+func Infof(format string, v ...interface{}) {
+	log.Infof(format, v...)
+}
+
+func Debug(v ...interface{}) {
+	log.Debug(v...)
+}
+
+func Debugf(format string, v ...interface{}) {
+	log.Debugf(format, v...)
+}
+
+func Warn(v ...interface{}) {
+	log.Warn(v...)
+}
+
+func Warnf(format string, v ...interface{}) {
+	log.Warnf(format, v...)
+}
+
+func Error(v ...interface{}) {
+	log.Error(v...)
+}
+
+func Errorf(format string, v ...interface{}) {
+	log.Errorf(format, v...)
+}
+
+func Fatal(v ...interface{}) {
+	log.Fatal(v...)
+}
+
+func Fatalf(format string, v ...interface{}) {
+	log.Fatalf(format, v...)
+}
+
+func stringToLogLevel(level string) log.Level {
+	switch strings.ToLower(level) {
+	case "fatal":
+		return log.FatalLevel
+	case "error":
+		return log.ErrorLevel
+	case "warn", "warning":
+		return log.WarnLevel
+	case "debug":
+		return log.DebugLevel
+	case "info":
+		return log.InfoLevel
+	}
+	return defaultLogLevel
+}
+
+// hook injects file name and line pos into log entry.
+type hook struct{}
+
+// Fire implements logrus.Hook interface
+func (h *hook) Fire(entry *log.Entry) error {
+	// these two num are set by manually testing
+	pc := make([]uintptr, 4)
+	cnt := runtime.Callers(10, pc)
+
+	for i := 0; i < cnt; i++ {
+		fu := runtime.FuncForPC(pc[i] - 1)
+		name := fu.Name()
+		if !isSkippedPackageName(name) {
+			file, line := fu.FileLine(pc[i] - 1)
+			entry.Data["file"] = path.Base(file)
+			entry.Data["line"] = line
+			break
+		}
+	}
+	return nil
+}
+
+// Levels implements logrus.Hook interface.
+func (h *hook) Levels() []log.Level {
+	return log.AllLevels
+}
+
+// isSKippedPackageName tests wether path name is on log library calling stack.
+func isSkippedPackageName(name string) bool {
+	return strings.Contains(name, "github.com/sirupsen/logrus") ||
+		strings.Contains(name, "github.com/apache/pulsar/pulsar-client-go/logutil")
+}
+
+// formatter is for compatibility with ngaut/log
+type formatter struct {
+	DisableTimestamp, EnableEntryOrder bool
+}
+
+// Format implements logrus.Formatter
+func (f *formatter) Format(entry *log.Entry) ([]byte, error) {
+	var b *bytes.Buffer
+	if entry.Buffer != nil {
+		b = entry.Buffer
+	} else {
+		b = &bytes.Buffer{}
+	}
+
+	if !f.DisableTimestamp {
+		fmt.Fprintf(b, "%s ", entry.Time.Format(defaultLogTimeFormat))
+	}
+	if file, ok := entry.Data["file"]; ok {
+		fmt.Fprintf(b, "%s:%v:", file, entry.Data["line"])
+	}
+	fmt.Fprintf(b, " [%s] %s", entry.Level.String(), entry.Message)
+
+	if f.EnableEntryOrder {
+		keys := make([]string, 0, len(entry.Data))
+		for k := range entry.Data {
+			if k != "file" && k != "line" {
+				keys = append(keys, k)
+			}
+		}
+		sort.Strings(keys)
+		for _, k := range keys {
+			fmt.Fprintf(b, " %v=%v", k, entry.Data[k])
+		}
+	} else {
+		for k, v := range entry.Data {
+			if k != "file" && k != "line" {
+				fmt.Fprintf(b, " %v=%v", k, v)
+			}
+		}
+	}
+
+	b.WriteByte('\n')
+
+	return b.Bytes(), nil
+}
+
+// SetLevel sets log's level by a level string.
+func SetLevel(level string) {
+	log.SetLevel(stringToLogLevel(level))
+}
+
+// GetLevel gets current log's level as a level string.
+func GetLevel() string {
+	return log.GetLevel().String()
+}
+
+// SetOutput sets the filename for the log.
+func SetOutput(filename string) {
+	output := &lumberjack.Logger{
+		Filename:  filename,
+		LocalTime: true,
+	}
+	log.SetOutput(output)
+}
+
+func init() {
+	log.SetLevel(defaultLogLevel)
+	log.SetFormatter(&formatter{})
+	log.AddHook(&hook{})
+}
diff --git a/pulsar-client-go/pulsar/c_client.go b/pulsar-client-go/pulsar/c_client.go
index 5787260..cee708d 100644
--- a/pulsar-client-go/pulsar/c_client.go
+++ b/pulsar-client-go/pulsar/c_client.go
@@ -25,17 +25,18 @@ package pulsar
 */
 import "C"
 import (
-	"log"
 	"runtime"
 	"strings"
 	"unsafe"
+
+	log "github.com/apache/pulsar/pulsar-client-go/logutil"
 )
 
 //export pulsarClientLoggerProxy
 func pulsarClientLoggerProxy(level C.pulsar_logger_level_t, file *C.char, line C.int, message
*C.char, ctx unsafe.Pointer) {
-	logger := restorePointerNoDelete(ctx).(func(LoggerLevel, string, int, string))
+	logger := restorePointerNoDelete(ctx).(func(log.LoggerLevel, string, int, string))
 
-	logger(LoggerLevel(level), C.GoString(file), int(line), C.GoString(message))
+	logger(log.LoggerLevel(level), C.GoString(file), int(line), C.GoString(message))
 }
 
 func newClient(options ClientOptions) (Client, error) {
@@ -63,8 +64,8 @@ func newClient(options ClientOptions) (Client, error) {
 
 	if options.Logger == nil {
 		// Configure a default logger with same date format as Go logs
-		options.Logger = func(level LoggerLevel, file string, line int, message string) {
-			log.Printf("%-5s | %s:%d | %s", level, file, line, message)
+		options.Logger = func(level log.LoggerLevel, file string, line int, message string) {
+			log.Infof("%-5s | %s:%d | %s", level, file, line, message)
 		}
 	}
 
@@ -144,7 +145,7 @@ func pulsarClientTokenSupplierProxy(ctx unsafe.Pointer) *C.char {
 	tokenSupplier := restorePointerNoDelete(ctx).(func() string)
 	token := tokenSupplier()
 	// The C string will be freed from within the C wrapper itself
-	return C.CString(token);
+	return C.CString(token)
 }
 
 func newAuthenticationTokenSupplier(tokenSupplier func() string) Authentication {
diff --git a/pulsar-client-go/pulsar/client.go b/pulsar-client-go/pulsar/client.go
index ed07fbb..46910c1 100644
--- a/pulsar-client-go/pulsar/client.go
+++ b/pulsar-client-go/pulsar/client.go
@@ -19,7 +19,11 @@
 
 package pulsar
 
-import "time"
+import (
+	"time"
+
+	log "github.com/apache/pulsar/pulsar-client-go/logutil"
+)
 
 func NewClient(options ClientOptions) (Client, error) {
 	return newClient(options)
@@ -74,7 +78,7 @@ type ClientOptions struct {
 	// By default, log messages will be printed on standard output. By passing a logger function,
application
 	// can determine how to print logs. This function will be called each time the Pulsar client
library wants
 	// to write any logs.
-	Logger func(level LoggerLevel, file string, line int, message string)
+	Logger func(level log.LoggerLevel, file string, line int, message string)
 
 	// Set the path to the trusted TLS certificate file
 	TLSTrustCertsFilePath string
diff --git a/pulsar-client-go/pulsar/client_test.go b/pulsar-client-go/pulsar/client_test.go
index 61e4770..3fb2819 100644
--- a/pulsar-client-go/pulsar/client_test.go
+++ b/pulsar-client-go/pulsar/client_test.go
@@ -22,16 +22,17 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"github.com/stretchr/testify/assert"
 	"io/ioutil"
 	"testing"
 )
 
 func TestGetTopicPartitions(t *testing.T) {
 	client, err := NewClient(ClientOptions{
-		URL:                      "pulsar://localhost:6650",
+		URL: "pulsar://localhost:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	// Create topic with 5 partitions
@@ -41,10 +42,10 @@ func TestGetTopicPartitions(t *testing.T) {
 	partitionedTopic := "persistent://public/default/TestGetTopicPartitions"
 
 	partitions, err := client.TopicPartitions(partitionedTopic)
-	assertNil(t, err)
-	assertEqual(t, len(partitions), 5)
+	assert.Nil(t, err)
+	assert.Equal(t, len(partitions), 5)
 	for i := 0; i < 5; i++ {
-		assertEqual(t, partitions[i],
+		assert.Equal(t, partitions[i],
 			fmt.Sprintf("%s-partition-%d", partitionedTopic, i))
 	}
 
@@ -52,16 +53,16 @@ func TestGetTopicPartitions(t *testing.T) {
 	topic := "persistent://public/default/TestGetTopicPartitions-nopartitions"
 
 	partitions, err = client.TopicPartitions(topic)
-	assertNil(t, err)
-	assertEqual(t, len(partitions), 1)
-	assertEqual(t, partitions[0], topic)
+	assert.Nil(t, err)
+	assert.Equal(t, len(partitions), 1)
+	assert.Equal(t, partitions[0], topic)
 }
 
 const TestTokenFilePath = "/tmp/pulsar-test-data/certs/token.txt"
 
 func readToken(t *testing.T) string {
 	data, err := ioutil.ReadFile(TestTokenFilePath)
-	assertNil(t, err)
+	assert.Nil(t, err)
 
 	return string(data)
 }
@@ -72,7 +73,7 @@ func TestTokenAuth(t *testing.T) {
 		Authentication: NewAuthenticationToken(readToken(t)),
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	topic := "persistent://private/auth/TestTokenAuth"
@@ -81,7 +82,7 @@ func TestTokenAuth(t *testing.T) {
 		Topic: topic,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer.Close()
 
 	ctx := context.Background()
@@ -97,13 +98,13 @@ func TestTokenAuth(t *testing.T) {
 
 func TestTokenAuthSupplier(t *testing.T) {
 	client, err := NewClient(ClientOptions{
-		URL:            "pulsar://localhost:6650",
-		Authentication: NewAuthenticationTokenSupplier(func () string {
+		URL: "pulsar://localhost:6650",
+		Authentication: NewAuthenticationTokenSupplier(func() string {
 			return readToken(t)
 		}),
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	topic := "persistent://private/auth/TestTokenAuth"
@@ -112,7 +113,7 @@ func TestTokenAuthSupplier(t *testing.T) {
 		Topic: topic,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer.Close()
 
 	ctx := context.Background()
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 8a46599..5b865a4 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -22,6 +22,7 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"github.com/stretchr/testify/assert"
 	"io/ioutil"
 	"net/http"
 	"strings"
@@ -34,7 +35,7 @@ func TestConsumerConnectError(t *testing.T) {
 		URL: "pulsar://invalid-hostname:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 
 	defer client.Close()
 
@@ -44,10 +45,10 @@ func TestConsumerConnectError(t *testing.T) {
 	})
 
 	// Expect error in creating consumer
-	assertNil(t, consumer)
-	assertNotNil(t, err)
+	assert.Nil(t, consumer)
+	assert.NotNil(t, err)
 
-	assertEqual(t, err.(*Error).Result(), ConnectError);
+	assert.Equal(t, err.(*Error).Result(), ConnectError);
 }
 
 func TestConsumer(t *testing.T) {
@@ -55,14 +56,14 @@ func TestConsumer(t *testing.T) {
 		URL: "pulsar://localhost:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	producer, err := client.CreateProducer(ProducerOptions{
 		Topic: "my-topic",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer.Close()
 
 	consumer, err := client.Subscribe(ConsumerOptions{
@@ -75,11 +76,11 @@ func TestConsumer(t *testing.T) {
 		Type:                                      Shared,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer consumer.Close()
 
-	assertEqual(t, consumer.Topic(), "persistent://public/default/my-topic")
-	assertEqual(t, consumer.Subscription(), "my-sub")
+	assert.Equal(t, consumer.Topic(), "persistent://public/default/my-topic")
+	assert.Equal(t, consumer.Subscription(), "my-sub")
 
 	ctx := context.Background()
 
@@ -91,11 +92,11 @@ func TestConsumer(t *testing.T) {
 		}
 
 		msg, err := consumer.Receive(ctx)
-		assertNil(t, err)
-		assertNotNil(t, msg)
+		assert.Nil(t, err)
+		assert.NotNil(t, msg)
 
-		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
-		assertEqual(t, string(msg.Topic()), "persistent://public/default/my-topic")
+		assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+		assert.Equal(t, string(msg.Topic()), "persistent://public/default/my-topic")
 
 		consumer.Ack(msg)
 	}
@@ -108,7 +109,7 @@ func TestConsumerCompaction(t *testing.T) {
 		URL: "pulsar://localhost:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	topic := fmt.Sprintf("my-compaction-topic-%d", time.Now().Unix())
@@ -117,7 +118,7 @@ func TestConsumerCompaction(t *testing.T) {
 		Topic: topic,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer.Close()
 
 	// Pre-create both subscriptions to retain published messages
@@ -126,7 +127,7 @@ func TestConsumerCompaction(t *testing.T) {
 		SubscriptionName: "my-sub-1",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	consumer1.Close()
 
 	consumer2, err := client.Subscribe(ConsumerOptions{
@@ -135,7 +136,7 @@ func TestConsumerCompaction(t *testing.T) {
 		ReadCompacted:    true,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	consumer2.Close()
 
 	ctx := context.Background()
@@ -159,7 +160,7 @@ func TestConsumerCompaction(t *testing.T) {
 			time.Sleep(100 * time.Millisecond)
 			continue
 		} else {
-			assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+			assert.Equal(t, strings.Contains(res, "SUCCESS"), true)
 			fmt.Println("Compaction is done")
 			break
 		}
@@ -172,7 +173,7 @@ func TestConsumerCompaction(t *testing.T) {
 		SubscriptionName: "my-sub-1",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer consumer1.Close()
 
 	consumer2, err = client.Subscribe(ConsumerOptions{
@@ -181,31 +182,31 @@ func TestConsumerCompaction(t *testing.T) {
 		ReadCompacted:    true,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer consumer2.Close()
 
 	// Consumer-1 will receive all messages
 	for i := 0; i < 10; i++ {
 		msg, err := consumer1.Receive(context.Background())
-		assertNil(t, err)
-		assertNotNil(t, msg)
+		assert.Nil(t, err)
+		assert.NotNil(t, msg)
 
-		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+		assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
 	}
 
 	// Consumer-2 will only receive the last message
 	msg, err := consumer2.Receive(context.Background())
-	assertNil(t, err)
-	assertNotNil(t, msg)
-	assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+	assert.Nil(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
 
 	// No more messages on consumer-2
 	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
 	defer cancel()
 
 	msg, err = consumer2.Receive(ctx)
-	assertNil(t, msg)
-	assertNotNil(t, err)
+	assert.Nil(t, msg)
+	assert.NotNil(t, err)
 }
 
 func TestConsumerWithInvalidConf(t *testing.T) {
@@ -225,20 +226,20 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 	})
 
 	// Expect error in creating cosnumer
-	assertNil(t, consumer)
-	assertNotNil(t, err)
+	assert.Nil(t, consumer)
+	assert.NotNil(t, err)
 
-	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
+	assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
 
 	consumer, err = client.Subscribe(ConsumerOptions{
 		SubscriptionName: "my-subscription",
 	})
 
 	// Expect error in creating consumer
-	assertNil(t, consumer)
-	assertNotNil(t, err)
+	assert.Nil(t, consumer)
+	assert.NotNil(t, err)
 
-	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
+	assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
 }
 
 func makeHttpPutCall(t *testing.T, url string) string {
@@ -278,20 +279,20 @@ func TestConsumerMultiTopics(t *testing.T) {
 		URL: "pulsar://localhost:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	producer1, err := client.CreateProducer(ProducerOptions{
 		Topic: "multi-topic-1",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 
 	producer2, err := client.CreateProducer(ProducerOptions{
 		Topic: "multi-topic-2",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer1.Close()
 	defer producer2.Close()
 
@@ -300,10 +301,10 @@ func TestConsumerMultiTopics(t *testing.T) {
 		SubscriptionName: "my-sub",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer consumer.Close()
 
-	assertEqual(t, consumer.Subscription(), "my-sub")
+	assert.Equal(t, consumer.Subscription(), "my-sub")
 
 	ctx := context.Background()
 
@@ -314,7 +315,7 @@ func TestConsumerMultiTopics(t *testing.T) {
 		}); err != nil {
 			t.Fatal(err)
 		}
-		assertEqual(t, producer1.LastSequenceID(), int64(3))
+		assert.Equal(t, producer1.LastSequenceID(), int64(3))
 
 		if err := producer2.Send(ctx, ProducerMessage{
 			Payload:    []byte(fmt.Sprintf("hello-%d", i)),
@@ -322,13 +323,13 @@ func TestConsumerMultiTopics(t *testing.T) {
 		}); err != nil {
 			t.Fatal(err)
 		}
-		assertEqual(t, producer2.LastSequenceID(), int64(i))
+		assert.Equal(t, producer2.LastSequenceID(), int64(i))
 	}
 
 	for i := 0; i < 20; i++ {
 		msg, err := consumer.Receive(ctx)
-		assertNil(t, err)
-		assertNotNil(t, msg)
+		assert.Nil(t, err)
+		assert.NotNil(t, msg)
 
 		consumer.Ack(msg)
 	}
@@ -341,20 +342,20 @@ func TestConsumerRegex(t *testing.T) {
 		URL: "pulsar://localhost:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	producer1, err := client.CreateProducer(ProducerOptions{
 		Topic: "topic-1",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 
 	producer2, err := client.CreateProducer(ProducerOptions{
 		Topic: "topic-2",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer1.Close()
 	defer producer2.Close()
 
@@ -363,10 +364,10 @@ func TestConsumerRegex(t *testing.T) {
 		SubscriptionName: "my-sub",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer consumer.Close()
 
-	assertEqual(t, consumer.Subscription(), "my-sub")
+	assert.Equal(t, consumer.Subscription(), "my-sub")
 
 	ctx := context.Background()
 
@@ -386,11 +387,11 @@ func TestConsumerRegex(t *testing.T) {
 
 	for i := 0; i < 20; i++ {
 		msg, err := consumer.Receive(ctx)
-		assertNil(t, err)
-		assertNotNil(t, msg)
+		assert.Nil(t, err)
+		assert.NotNil(t, msg)
 
 		consumer.Ack(msg)
 	}
 
 	consumer.Unsubscribe()
-}
\ No newline at end of file
+}
diff --git a/pulsar-client-go/pulsar/logger.go b/pulsar-client-go/pulsar/logger.go
deleted file mode 100644
index 79734c4..0000000
--- a/pulsar-client-go/pulsar/logger.go
+++ /dev/null
@@ -1,47 +0,0 @@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-
-package pulsar
-
-import "fmt"
-
-type LoggerLevel int
-
-const (
-	DEBUG LoggerLevel = iota
-	INFO
-	WARN
-	ERROR
-)
-
-func (l LoggerLevel) String() string {
-	switch l {
-	case DEBUG:
-		return "DEBUG"
-	case INFO:
-		return "INFO"
-	case WARN:
-		return "WARN"
-	case ERROR:
-		return "ERROR"
-
-	default:
-		return fmt.Sprintf("UNKNOWN: %d", l)
-	}
-}
\ No newline at end of file
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index 613bdb7..95f8fad 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -22,6 +22,7 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"github.com/stretchr/testify/assert"
 	"testing"
 	"time"
 )
@@ -39,7 +40,7 @@ func TestProducerConnectError(t *testing.T) {
 		URL: "pulsar://invalid-hostname:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 
 	defer client.Close()
 
@@ -48,10 +49,10 @@ func TestProducerConnectError(t *testing.T) {
 	})
 
 	// Expect error in creating producer
-	assertNil(t, producer)
-	assertNotNil(t, err)
+	assert.Nil(t, producer)
+	assert.NotNil(t, err)
 
-	assertEqual(t, err.(*Error).Result(), ConnectError)
+	assert.Equal(t, err.(*Error).Result(), ConnectError)
 }
 
 func TestProducer(t *testing.T) {
@@ -64,7 +65,7 @@ func TestProducer(t *testing.T) {
 		MessageListenerThreads:   5,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	producer, err := client.CreateProducer(ProducerOptions{
@@ -83,12 +84,12 @@ func TestProducer(t *testing.T) {
 		},
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer.Close()
 
-	assertEqual(t, producer.Topic(), "persistent://public/default/my-topic")
-	assertEqual(t, producer.Name(), "my-producer-name")
-	assertEqual(t, producer.LastSequenceID(), int64(-1))
+	assert.Equal(t, producer.Topic(), "persistent://public/default/my-topic")
+	assert.Equal(t, producer.Name(), "my-producer-name")
+	assert.Equal(t, producer.LastSequenceID(), int64(-1))
 
 	ctx := context.Background()
 
@@ -98,9 +99,9 @@ func TestProducer(t *testing.T) {
 		}); err != nil {
 			t.Fatal(err)
 		}
-		assertEqual(t, producer.LastSequenceID(), int64(i))
+		assert.Equal(t, producer.LastSequenceID(), int64(i))
 	}
-	assertEqual(t, producer.LastSequenceID(), int64(9))
+	assert.Equal(t, producer.LastSequenceID(), int64(9))
 }
 
 func TestProducerNoTopic(t *testing.T) {
@@ -119,10 +120,10 @@ func TestProducerNoTopic(t *testing.T) {
 	})
 
 	// Expect error in creating producer
-	assertNil(t, producer)
-	assertNotNil(t, err)
+	assert.Nil(t, producer)
+	assert.NotNil(t, err)
 
-	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
+	assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
 }
 
 func TestMessageRouter(t *testing.T) {
@@ -134,7 +135,7 @@ func TestMessageRouter(t *testing.T) {
 		URL: "pulsar://localhost:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	// Only subscribe on the specific partition
@@ -143,7 +144,7 @@ func TestMessageRouter(t *testing.T) {
 		SubscriptionName: "my-sub",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer consumer.Close()
 
 	producer, err := client.CreateProducer(ProducerOptions{
@@ -154,7 +155,7 @@ func TestMessageRouter(t *testing.T) {
 		},
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer.Close()
 
 	ctx := context.Background()
@@ -163,16 +164,16 @@ func TestMessageRouter(t *testing.T) {
 		Payload:    []byte("hello"),
 		SequenceID: 1234,
 	})
-	assertNil(t, err)
-	assertEqual(t, producer.LastSequenceID(), int64(1234))
+	assert.Nil(t, err)
+	assert.Equal(t, producer.LastSequenceID(), int64(1234))
 
 	fmt.Println("PUBLISHED")
 
 	// Verify message was published on partition 2
 	msg, err := consumer.Receive(ctx)
-	assertNil(t, err)
-	assertNotNil(t, msg)
-	assertEqual(t, string(msg.Payload()), "hello")
+	assert.Nil(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, string(msg.Payload()), "hello")
 }
 
 func TestProducerZstd(t *testing.T) {
@@ -180,7 +181,7 @@ func TestProducerZstd(t *testing.T) {
 		URL: "pulsar://localhost:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	producer, err := client.CreateProducer(ProducerOptions{
@@ -188,11 +189,11 @@ func TestProducerZstd(t *testing.T) {
 		CompressionType: ZSTD,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer.Close()
 
-	assertEqual(t, producer.Topic(), "persistent://public/default/my-topic")
-	assertEqual(t, producer.Name(), "my-producer-name")
+	assert.Equal(t, producer.Topic(), "persistent://public/default/my-topic")
+	assert.Equal(t, producer.Name(), "my-producer-name")
 
 	ctx := context.Background()
 
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index a0a63ae..a7fddb4 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -22,6 +22,7 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"github.com/stretchr/testify/assert"
 	"strings"
 	"testing"
 	"time"
@@ -32,7 +33,7 @@ func TestReaderConnectError(t *testing.T) {
 		URL: "pulsar://invalid-hostname:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 
 	defer client.Close()
 
@@ -42,10 +43,10 @@ func TestReaderConnectError(t *testing.T) {
 	})
 
 	// Expect error in creating reader
-	assertNil(t, reader)
-	assertNotNil(t, err)
+	assert.Nil(t, reader)
+	assert.NotNil(t, err)
 
-	assertEqual(t, err.(*Error).Result(), ConnectError);
+	assert.Equal(t, err.(*Error).Result(), ConnectError);
 }
 
 func TestReader(t *testing.T) {
@@ -53,14 +54,14 @@ func TestReader(t *testing.T) {
 		URL: "pulsar://localhost:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	producer, err := client.CreateProducer(ProducerOptions{
 		Topic: "my-reader-topic",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer.Close()
 
 	reader, err := client.CreateReader(ReaderOptions{
@@ -68,10 +69,10 @@ func TestReader(t *testing.T) {
 		StartMessageID: LatestMessage,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer reader.Close()
 
-	assertEqual(t, reader.Topic(), "persistent://public/default/my-reader-topic")
+	assert.Equal(t, reader.Topic(), "persistent://public/default/my-reader-topic")
 
 	ctx := context.Background()
 
@@ -83,19 +84,19 @@ func TestReader(t *testing.T) {
 		}
 
 		hasNext, err := reader.HasNext()
-		assertNil(t, err)
-		assertEqual(t, hasNext, true)
+		assert.Nil(t, err)
+		assert.Equal(t, hasNext, true)
 
 		msg, err := reader.Next(ctx)
-		assertNil(t, err)
-		assertNotNil(t, msg)
+		assert.Nil(t, err)
+		assert.NotNil(t, msg)
 
-		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+		assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
 	}
 
 	hasNext, err := reader.HasNext()
-	assertNil(t, err)
-	assertEqual(t, hasNext, false)
+	assert.Nil(t, err)
+	assert.Equal(t, hasNext, false)
 }
 
 func TestReaderWithInvalidConf(t *testing.T) {
@@ -115,29 +116,28 @@ func TestReaderWithInvalidConf(t *testing.T) {
 	})
 
 	// Expect error in creating cosnumer
-	assertNil(t, reader)
-	assertNotNil(t, err)
+	assert.Nil(t, reader)
+	assert.NotNil(t, err)
 
-	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
+	assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
 
 	reader, err = client.CreateReader(ReaderOptions{
 		StartMessageID: LatestMessage,
 	})
 
 	// Expect error in creating cosnumer
-	assertNil(t, reader)
-	assertNotNil(t, err)
+	assert.Nil(t, reader)
+	assert.NotNil(t, err)
 
-	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
+	assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
 }
 
-
 func TestReaderCompaction(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: "pulsar://localhost:6650",
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer client.Close()
 
 	topic := fmt.Sprintf("my-reader-compaction-topic-%d", time.Now().Unix())
@@ -146,7 +146,7 @@ func TestReaderCompaction(t *testing.T) {
 		Topic: topic,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer producer.Close()
 
 	ctx := context.Background()
@@ -170,7 +170,7 @@ func TestReaderCompaction(t *testing.T) {
 			time.Sleep(100 * time.Millisecond)
 			continue
 		} else {
-			assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+			assert.Equal(t, strings.Contains(res, "SUCCESS"), true)
 			fmt.Println("Compaction is done")
 			break
 		}
@@ -183,7 +183,7 @@ func TestReaderCompaction(t *testing.T) {
 		StartMessageID: EarliestMessage,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer reader1.Close()
 
 	reader2, err := client.CreateReader(ReaderOptions{
@@ -192,30 +192,29 @@ func TestReaderCompaction(t *testing.T) {
 		ReadCompacted:  true,
 	})
 
-	assertNil(t, err)
+	assert.Nil(t, err)
 	defer reader2.Close()
 
 	// Reader-1 will receive all messages
 	for i := 0; i < 10; i++ {
 		msg, err := reader1.Next(context.Background())
-		assertNil(t, err)
-		assertNotNil(t, msg)
+		assert.Nil(t, err)
+		assert.NotNil(t, msg)
 
-		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+		assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
 	}
 
 	// Reader-2 will only receive the last message
 	msg, err := reader2.Next(context.Background())
-	assertNil(t, err)
-	assertNotNil(t, msg)
-	assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+	assert.Nil(t, err)
+	assert.NotNil(t, msg)
+	assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
 
 	// No more messages on consumer-2
 	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
 	defer cancel()
 
 	msg, err = reader2.Next(ctx)
-	assertNil(t, msg)
-	assertNotNil(t, err)
+	assert.Nil(t, msg)
+	assert.NotNil(t, err)
 }
-
diff --git a/pulsar-client-go/pulsar/util_test.go b/pulsar-client-go/pulsar/util_test.go
index 5fdbac2..4498cfc 100644
--- a/pulsar-client-go/pulsar/util_test.go
+++ b/pulsar-client-go/pulsar/util_test.go
@@ -20,35 +20,12 @@
 package pulsar
 
 import (
-	"testing"
-	"runtime"
-	"net/http"
-	"log"
-	"encoding/json"
 	"bytes"
+	"encoding/json"
+	log "github.com/apache/pulsar/pulsar-client-go/logutil"
+	"net/http"
 )
 
-func assertNil(t *testing.T, a interface{}) {
-	if a != nil {
-		_, file, line, _ := runtime.Caller(1)
-		t.Fatalf("%s:%d  | Expected nil", file, line)
-	}
-}
-
-func assertNotNil(t *testing.T, a interface{}) {
-	if a == nil {
-		_, file, line, _ := runtime.Caller(1)
-		t.Fatalf("%s:%d  | Expected not nil", file, line)
-	}
-}
-
-func assertEqual(t *testing.T, realValue interface{}, expected interface{}) {
-	if realValue != expected {
-		_, file, line, _ := runtime.Caller(1)
-		t.Fatalf("%s:%d  | Expected '%v' -- Got '%v'", file, line, expected, realValue)
-	}
-}
-
 func httpPut(url string, body interface{}) {
 	client := http.Client{}
 


Mime
View raw message