arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jasper Rädisch <jas...@raedisch.net>
Subject [go+Flight] DoExchange missing Descriptor
Date Tue, 02 Mar 2021 06:55:10 GMT
Hi everyone,

newbie trying to get into Flight with Go here.

Invoking DoAction was pretty straight forward, but now I have problems with DoExchange, namely:

```
 rpc error: code = Unknown desc = Descriptor missing on first message
```

I managed wrapping flight.FlightServiceClient.DoExchange into a flight.NewRecordWriter, but
there is AFAIK no way to set the FlightDescriptor, which lead me to the assumption, that the
first message has to be sent manually via DoEX.Send(<build FlightData here>).

Is there a preferred way/helper to build said FlightData? Am I on the right track at all?
I will keep digging into the source, but am thankful for any help.

Jasper

Complete client.go so far:


```
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/apache/arrow/go/arrow"
	"github.com/apache/arrow/go/arrow/array"
	"github.com/apache/arrow/go/arrow/flight"
	"github.com/apache/arrow/go/arrow/ipc"
	"github.com/apache/arrow/go/arrow/memory"
	"google.golang.org/grpc"
)

func main() {
	conn, err := grpc.Dial("localhost:3001", grpc.WithInsecure(), grpc.WithBlock())
	c := flight.NewFlightServiceClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
	defer cancel()
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	i := 0
	doEX, err := c.DoExchange(ctx)
	if err != nil {
		panic(err)
	}
	// md := arrow.NewMetadata([]string{"f1-i64"}, []string{"v3"})
	schema := arrow.NewSchema([]arrow.Field{
		{Name: "f1-i64", Type: arrow.PrimitiveTypes.Int64},
	}, nil)
	pool := memory.NewGoAllocator()
	w := flight.NewRecordWriter(doEX, ipc.WithSchema(schema), ipc.WithAllocator(pool))
	b := array.NewRecordBuilder(pool, schema)
	defer b.Release()

	for {
		b.Field(0).(*array.Int64Builder).Append(int64(i))
		r := b.NewRecord()
		w.Write(r)
		if err != nil {
			panic(err)
		}
		fmt.Println(doEX.Recv())
		defer r.Release()
		time.Sleep(1 * time.Second)
		i = i + 1
		if i == 10 {
			break
		}
	}
	doEX.CloseSend()
}
```

Mime
View raw message