kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1300777 - in /incubator/kafka/trunk/clients/go: src/consumer.go src/kafka.go src/message.go src/payload_codec.go src/publisher.go src/request.go src/timing.go tools/consumer/consumer.go tools/offsets/offsets.go tools/publisher/publisher.go
Date Wed, 14 Mar 2012 22:58:57 GMT
Author: junrao
Date: Wed Mar 14 22:58:57 2012
New Revision: 1300777

URL: http://svn.apache.org/viewvc?rev=1300777&view=rev
Log:
Update Go Client to new version of Go; patched by AaronR; KAFKA-296

Modified:
    incubator/kafka/trunk/clients/go/src/consumer.go
    incubator/kafka/trunk/clients/go/src/kafka.go
    incubator/kafka/trunk/clients/go/src/message.go
    incubator/kafka/trunk/clients/go/src/payload_codec.go
    incubator/kafka/trunk/clients/go/src/publisher.go
    incubator/kafka/trunk/clients/go/src/request.go
    incubator/kafka/trunk/clients/go/src/timing.go
    incubator/kafka/trunk/clients/go/tools/consumer/consumer.go
    incubator/kafka/trunk/clients/go/tools/offsets/offsets.go
    incubator/kafka/trunk/clients/go/tools/publisher/publisher.go

Modified: incubator/kafka/trunk/clients/go/src/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/consumer.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/consumer.go (original)
+++ incubator/kafka/trunk/clients/go/src/consumer.go Wed Mar 14 22:58:57 2012
@@ -23,11 +23,12 @@
 package kafka
 
 import (
+  "encoding/binary"
+  "errors"
+  "io"
   "log"
-  "os"
   "net"
   "time"
-  "encoding/binary"
 )
 
 type BrokerConsumer struct {
@@ -66,11 +67,11 @@ func NewBrokerOffsetConsumer(hostname st
 func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) {
   // merge to the default map, so one 'could' override the default codecs..
   for k, v := range codecsMap(payloadCodecs) {
-    consumer.codecs[k] = v, true
+    consumer.codecs[k] = v
   }
 }
 
-func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64,
quit chan bool) (int, os.Error) {
+func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64,
quit chan bool) (int, error) {
   conn, err := consumer.broker.connect()
   if err != nil {
     return -1, err
@@ -86,14 +87,14 @@ func (consumer *BrokerConsumer) ConsumeO
       })
 
       if err != nil {
-        if err != os.EOF {
+        if err != io.EOF {
           log.Println("Fatal Error: ", err)
           panic(err)
         }
         quit <- true // force quit
         break
       }
-      time.Sleep(pollTimeoutMs * 1000000)
+      time.Sleep(time.Millisecond * time.Duration(pollTimeoutMs))
     }
     done <- true
   }()
@@ -107,7 +108,7 @@ func (consumer *BrokerConsumer) ConsumeO
 
 type MessageHandlerFunc func(msg *Message)
 
-func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) {
+func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) {
   conn, err := consumer.broker.connect()
   if err != nil {
     return -1, err
@@ -123,7 +124,7 @@ func (consumer *BrokerConsumer) Consume(
   return num, err
 }
 
-func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc)
(int, os.Error) {
+func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc)
(int, error) {
   _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize))
   if err != nil {
     return -1, err
@@ -142,7 +143,7 @@ func (consumer *BrokerConsumer) consumeW
     for currentOffset <= uint64(length-4) {
       totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs)
       if msgs == nil {
-        return num, os.NewError("Error Decoding Message")
+        return num, errors.New("Error Decoding Message")
       }
       msgOffset := consumer.offset + currentOffset
       for _, msg := range msgs {
@@ -164,7 +165,7 @@ func (consumer *BrokerConsumer) consumeW
 // Get a list of valid offsets (up to maxNumOffsets) before the given time, where 
 // time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset
available)
 // The result is a list of offsets, in descending order.
-func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error)
{
+func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error)
{
   offsets := make([]uint64, 0)
 
   conn, err := consumer.broker.connect()

Modified: incubator/kafka/trunk/clients/go/src/kafka.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/kafka.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/kafka.go (original)
+++ incubator/kafka/trunk/clients/go/src/kafka.go Wed Mar 14 22:58:57 2012
@@ -23,13 +23,13 @@
 package kafka
 
 import (
-  "log"
-  "net"
-  "os"
-  "fmt"
+  "bufio"
   "encoding/binary"
+  "errors"
+  "fmt"
   "io"
-  "bufio"
+  "log"
+  "net"
 )
 
 const (
@@ -48,7 +48,7 @@ func newBroker(hostname string, topic st
     hostname:  hostname}
 }
 
-func (b *Broker) connect() (conn *net.TCPConn, error os.Error) {
+func (b *Broker) connect() (conn *net.TCPConn, error error) {
   raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname)
   if err != nil {
     log.Println("Fatal Error: ", err)
@@ -63,7 +63,7 @@ func (b *Broker) connect() (conn *net.TC
 }
 
 // returns length of response & payload & err
-func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) {
+func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) {
   reader := bufio.NewReader(conn)
   length := make([]byte, 4)
   lenRead, err := io.ReadFull(reader, length)
@@ -71,7 +71,7 @@ func (b *Broker) readResponse(conn *net.
     return 0, []byte{}, err
   }
   if lenRead != 4 || lenRead < 0 {
-    return 0, []byte{}, os.NewError("invalid length of the packet length field")
+    return 0, []byte{}, errors.New("invalid length of the packet length field")
   }
 
   expectedLength := binary.BigEndian.Uint32(length)
@@ -82,13 +82,13 @@ func (b *Broker) readResponse(conn *net.
   }
 
   if uint32(lenRead) != expectedLength {
-    return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d  expected:
 %d", lenRead, expectedLength))
+    return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d  expected:
 %d", lenRead, expectedLength))
   }
 
   errorCode := binary.BigEndian.Uint16(messages[0:2])
   if errorCode != 0 {
     log.Println("errorCode: ", errorCode)
-    return 0, []byte{}, os.NewError(
+    return 0, []byte{}, errors.New(
       fmt.Sprintf("Broker Response Error: %d", errorCode))
   }
   return expectedLength, messages[2:], nil

Modified: incubator/kafka/trunk/clients/go/src/message.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/message.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/message.go (original)
+++ incubator/kafka/trunk/clients/go/src/message.go Wed Mar 14 22:58:57 2012
@@ -23,9 +23,9 @@
 package kafka
 
 import (
-  "hash/crc32"
-  "encoding/binary"
   "bytes"
+  "encoding/binary"
+  "hash/crc32"
   "log"
 )
 

Modified: incubator/kafka/trunk/clients/go/src/payload_codec.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/payload_codec.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/payload_codec.go (original)
+++ incubator/kafka/trunk/clients/go/src/payload_codec.go Wed Mar 14 22:58:57 2012
@@ -57,7 +57,7 @@ var DefaultCodecsMap = codecsMap(Default
 func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec {
   payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs))
   for _, c := range payloadCodecs {
-    payloadCodecsMap[c.Id()] = c, true
+    payloadCodecsMap[c.Id()] = c
   }
   return payloadCodecsMap
 }
@@ -65,7 +65,6 @@ func codecsMap(payloadCodecs []PayloadCo
 // No compression codec, noop
 
 type NoCompressionPayloadCodec struct {
-
 }
 
 func (codec *NoCompressionPayloadCodec) Id() byte {
@@ -83,7 +82,6 @@ func (codec *NoCompressionPayloadCodec) 
 // Gzip Codec
 
 type GzipPayloadCodec struct {
-
 }
 
 func (codec *GzipPayloadCodec) Id() byte {

Modified: incubator/kafka/trunk/clients/go/src/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/publisher.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/publisher.go (original)
+++ incubator/kafka/trunk/clients/go/src/publisher.go Wed Mar 14 22:58:57 2012
@@ -22,10 +22,6 @@
 
 package kafka
 
-import (
-  "os"
-)
-
 type BrokerPublisher struct {
   broker *Broker
 }
@@ -34,11 +30,11 @@ func NewBrokerPublisher(hostname string,
   return &BrokerPublisher{broker: newBroker(hostname, topic, partition)}
 }
 
-func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) {
+func (b *BrokerPublisher) Publish(message *Message) (int, error) {
   return b.BatchPublish(message)
 }
 
-func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) {
+func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) {
   conn, err := b.broker.connect()
   if err != nil {
     return -1, err

Modified: incubator/kafka/trunk/clients/go/src/request.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/request.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/request.go (original)
+++ incubator/kafka/trunk/clients/go/src/request.go Wed Mar 14 22:58:57 2012
@@ -23,8 +23,8 @@
 package kafka
 
 import (
-  "encoding/binary"
   "bytes"
+  "encoding/binary"
 )
 
 type RequestType uint16

Modified: incubator/kafka/trunk/clients/go/src/timing.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/timing.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/timing.go (original)
+++ incubator/kafka/trunk/clients/go/src/timing.go Wed Mar 14 22:58:57 2012
@@ -34,16 +34,16 @@ type Timing struct {
 }
 
 func StartTiming(label string) *Timing {
-  return &Timing{label: label, start: time.Nanoseconds(), stop: 0}
+  return &Timing{label: label, start: time.Now().UnixNano()}
 }
 
 func (t *Timing) Stop() {
-  t.stop = time.Nanoseconds()
+  t.stop = time.Now().UnixNano()
 }
 
 func (t *Timing) Print() {
   if t.stop == 0 {
     t.Stop()
   }
-  log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000)
+  log.Printf("%s took: %f ms\n", t.label, float64(t.stop-t.start)/1000000)
 }

Modified: incubator/kafka/trunk/clients/go/tools/consumer/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/consumer/consumer.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/consumer/consumer.go (original)
+++ incubator/kafka/trunk/clients/go/tools/consumer/consumer.go Wed Mar 14 22:58:57 2012
@@ -23,12 +23,12 @@
 package main
 
 import (
-  "kafka"
   "flag"
   "fmt"
   "os"
-  "strconv"
   "os/signal"
+  "strconv"
+  kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
   "syscall"
 )
 
@@ -46,7 +46,7 @@ func init() {
   flag.StringVar(&topic, "topic", "test", "topic to publish to")
   flag.IntVar(&partition, "partition", 0, "partition to publish to")
   flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from")
-  flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from")
+  flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes of message set to request")
   flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file")
   flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming")
   flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout")
@@ -61,7 +61,7 @@ func main() {
 
   var payloadFile *os.File = nil
   if len(writePayloadsTo) > 0 {
-    var err os.Error
+    var err error
     payloadFile, err = os.Create(writePayloadsTo)
     if err != nil {
       fmt.Println("Error opening file: ", err)
@@ -74,7 +74,7 @@ func main() {
       msg.Print()
     }
     if payloadFile != nil {
-      payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n"))
+      payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n"))
       payloadFile.Write(msg.Payload())
       payloadFile.Write([]byte("\n-------------------------------\n"))
     }
@@ -83,10 +83,17 @@ func main() {
   if consumerForever {
     quit := make(chan bool, 1)
     go func() {
+      sigIn := make(chan os.Signal)
+      signal.Notify(sigIn)
       for {
-        sig := <-signal.Incoming
-        if sig.(os.UnixSignal) == syscall.SIGINT {
-          quit <- true
+
+        select {
+        case sig := <-sigIn:
+          if sig.(os.Signal) == syscall.SIGINT {
+            quit <- true
+          } else {
+            fmt.Println(sig)
+          }
         }
       }
     }()

Modified: incubator/kafka/trunk/clients/go/tools/offsets/offsets.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/offsets/offsets.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/offsets/offsets.go (original)
+++ incubator/kafka/trunk/clients/go/tools/offsets/offsets.go Wed Mar 14 22:58:57 2012
@@ -20,13 +20,12 @@
  *  of their respective owners.
  */
 
-
 package main
 
 import (
-  "kafka"
   "flag"
   "fmt"
+  kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
 )
 
 var hostname string
@@ -43,7 +42,6 @@ func init() {
   flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that:  time(ms)/-1(latest)/-2(earliest)")
 }
 
-
 func main() {
   flag.Parse()
   fmt.Println("Offsets :")
@@ -56,7 +54,7 @@ func main() {
     fmt.Println("Error: ", err)
   }
   fmt.Printf("Offsets found: %d\n", len(offsets))
-  for i := 0 ; i < len(offsets); i++ {
+  for i := 0; i < len(offsets); i++ {
     fmt.Printf("Offset[%d] = %d\n", i, offsets[i])
   }
 }

Modified: incubator/kafka/trunk/clients/go/tools/publisher/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/publisher/publisher.go?rev=1300777&r1=1300776&r2=1300777&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/publisher/publisher.go (original)
+++ incubator/kafka/trunk/clients/go/tools/publisher/publisher.go Wed Mar 14 22:58:57 2012
@@ -23,9 +23,9 @@
 package main
 
 import (
-  "kafka"
   "flag"
   "fmt"
+  kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
   "os"
 )
 
@@ -63,7 +63,7 @@ func main() {
       fmt.Println("Error: ", err)
       return
     }
-    payload := make([]byte, stat.Size)
+    payload := make([]byte, stat.Size())
     file.Read(payload)
     timing := kafka.StartTiming("Sending")
 



Mime
View raw message