avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lee Hambley <lee.hamb...@gmail.com>
Subject Help with Ruby RPC ()
Date Thu, 17 Jul 2014 14:03:28 GMT
Hi List,

As part of my freelance work supporting a Java team, I have been asked to
provide them with Flume messages using the Avro serialisation.

First of all, there are some problems with the trunk of the Ruby gem, I
would gladly contribute, but I suppose (perhaps someone could confirm) that
the correct place is the dev mailing list, where I can send unified diffs,
it's only about a typo (fixed) in a Gem dependency name, and incorrect
syntax for the error constants.

I cannot rule out other problems with the current trunk version of the Gem,
perhaps I should back-port my changes, and try a prior version.

Finally to my problem, I believe the problem is that contrary to the
canonical Ruby client example
<http://github.com/apache/avro/blob/trunk/lang/ruby/test/sample_ipc_client.rb>.
I'm not abolutely clear on the distinction between a schema and  a protocol
(I'd imagine the former is related purely to serialization, and the
protocol is for the RPC functionality)

However, the code, in a readable format is here
https://gist.github.com/leehambley/0c7c24c0cc2cd5dac9e4, reproduced in this
email below for the list archives:

flume_protocol_json = <<-EOF

{

  "protocol": "AvroSourceProtocol",

  "namespace": "org.apache.flume.source.avro",

  "types": [

    {

      "type": "enum",

      "name": "Status",

      "symbols": [

        "OK",

        "FAILED",

        "UNKNOWN"

      ]

    },

    {

      "type": "record",

      "name": "AvroFlumeEvent",

      "fields": [

        {

          "name": "headers",

          "type": {

            "type": "map",

            "values": "string"

          }

        },

        {

          "name": "body",

          "type": "bytes"

        }

      ]

    }

  ],

  "messages": {

    "append": {

      "request": [

        {

          "name": "event",

          "type": "AvroFlumeEvent"

        }

      ],

      "response": "Status"

    },

    "appendBatch": {

      "request": [

        {

          "name": "events",

          "type": {

            "type": "array",

            "items": "AvroFlumeEvent"

          }

        }

      ],

      "response": "Status"

    }

  }

}

EOF

flume_protocol = Avro::Protocol.parse(flume_protocol_json)

namespace :flume do

  task :ping do

    sock      = TCPSocket.new('data-4.hadoop-1.fra2.xing.com', 4444)

    client    = Avro::IPC::SocketTransport.new(sock)

    requestor = Avro::IPC::Requestor.new(flume_protocol, client)

    params = {

      "event" => {

        "headers" => { "source" => "search_keywords" },

        "body"    => "test\tkeywords"

      }

    }

    result = requestor.request('append', [params])

    puts("Result: " + result)

  end

end


I wonder if someone might be able to shed some light on why (almost
regardless of what I try) I can't seem to escape messages in this irk:

Avro::IO::AvroTypeError: The datum
> [{"event"=>{"headers"=>{"source"=>"search_keywords"},
> "body"=>"test\tkeywords"}}] is not an example of schema
> [{"name":"event","type":{"type":"record","name":"AvroFlumeEvent","namespace":"org.apache.flume.source.avro","fields":[{"name":"headers","type":{"type":"map","values":"string"}},{"name":"body","type":"bytes"}]}}]


I have also tried "{ "event" => { "headers" => { "source" =>
"search_keywords" }, "body" => "test\tkeywords" } }", { "AvroFLumeEvent" =>
{ "headers" => { "source" => "search_keywords" }, "body" =>
"test\tkeywords" } }" and everything else I could think of, with little or
no effect.

I spent some time with the debugger, and it appears that the serializer is
failing on some internal types. But until I can rule out that my message is
causing the problem, I can't really continue.

Thanks,

Lee Hambley
--
http://lee.hambley.name/

Mime
View raw message