avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Baldassari <jbaldass...@gmail.com>
Subject Re: Async Callbacks using Netty
Date Wed, 01 Feb 2012 06:24:42 GMT
Hi William,

Great test.  I ran your code, and it worked as expected for me, but I made
some slight changes to the client side to demonstrate what's happening:

    // Test sync call:
    System.out.println("1. " + new Date() + ": Saying Hello (sync)...");
    CharSequence syncResult = client.hello(); // This should block for 5
seconds
    System.out.println("2. " new Date() + ": Chat.hello() returned \"" +
syncResult + "\"");

    // Test async call:
    final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
    System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
    client.hello(future1); // This should not block.
    System.out.println("4. " + new Date() + ":
Chat.hello(Callback<CharSequence>) returned");
    CharSequence asyncResult = future1.get(); // This should block for 5
seconds
    System.out.println("5. " + new Date() + ": Callback<CharSequence>.get()
returned \"" + asyncResult + "\"");

When I ran that I got the following output:

    1. Wed Feb 01 00:13:36 EST 2012: Saying Hello (sync)...
    2. Wed Feb 01 00:13:41 EST 2012: Chat.hello() returned "Hello"

    3. Wed Feb 01 00:13:41 EST 2012: Saying Hello (async)...
    4. Wed Feb 01 00:13:41 EST 2012: Chat.hello(Callback<CharSequence>)
returned
    5. Wed Feb 01 00:13:46 EST 2012: Callback<CharSequence>.get() returned
"Hello"

As you can see, the synchronous call (lines 1-2) blocked for about 5
seconds as expected.  When the asynchronous call was invoked it returned
immediately (note timestamps on lines 3-4).  The part that blocked was the
CallFuture.get() on line 5 of the output.  The result of the callback can't
be obtained until the server returns it (after waiting 5 seconds).

I think I may know why this behavior seems confusing.  In practice I don't
think many people will use CallFuture.  It's basically an adapter to make
an asynchronous call synchronous by blocking until the result returns.
This is useful in unit tests and in situations where the client can't
proceed until the result is available.  However, to really take advantage
of the asynchronous API you never want to wait for the result of an RPC.
The client should just invoke async RPCs with some Callback instance and
then move onto other things, such as invoking more RPCs!

Here's an example.  Let's say we have an e-mail server with an Avro
protocol that allows us to access the users' mailboxes.  We might have a
method to allow us to search for all messages with a subject line that
matches some regular expression.  In IDL it might look something like this:

    protocol Mail {
      record Message {
        string from;
        array<string> to;
        union { string, null } subject;
        union { string, null } body;
      }
      array<Message> findBySubject(string regexp);
    }

It doesn't really matter what the implementation of this protocol looks
like on the server side.  Searching through all messages is likely to take
some time, so what we would want to do is to fire off an async RPC as soon
as the user clicks the search button, then return control to the UI
immediately so that the user can continue doing other things while the
search is running.  Whenever the results come back we would then notify the
user or populate the search results in the UI, e.g. via ajax/comet if it's
a web app.  So we would have a Callback implementation that would look
something like this:

  public class FindBySubjectCallback implements Callback<List<Message>> {
    private final RequestContext context;  // RequestContext is some class
that allows us to send events back to the user
    public FindBySubjectCallback(RequestContext context) {
      this.context = context;
    }
    @Override
    public void handleResult(List<Message> result) {
      // Notify user with results:
      requestContext.fireSearchResultReadyEvent(result);
    }
    @Override
    public void handleError(Throwable error) {
      // Notify user that an error occurred:
      requestContext.fireErrorEvent(error);
    }
  }

The client, which might be running in a servlet container, would then just
invoke the RPC like this:

    private Mail.Callback mailClient; // Client is initialized/injected
somewhere
    ...
    public void findBySubject(String regexp, RequestContext context) {
      mailClient.findBySubject(regexp, new FindBySubjectCallback(context));
      // return immediately without waiting for the search to complete!
    }
    ...

Anyway, hope that makes some sense.  Let me know if you have any questions.

-James


On Tue, Jan 31, 2012 at 11:23 PM, William Afendy <carbotex@gmail.com> wrote:

> Hi James,
>
> Thank you for your quick response. I'm still fairly new to the async
> stuff. I fixed the ChatImpl as suggested to implement Chat instead of
> Chat.Callback. I also added a 5 secs delay in the method hello().
>
> There is still something missing, I can't really see the non-blocking
> (async) part from Netty implementation. Please take a look at the
> AvroClient.java code below, I understand when the client.hello() gets
> called, this is the synchronous (blocking) part of the code. It blocked for
> 5 seconds as expected. Now, when I'm testing the async method by creating
> future1 then pass it in client.hello(future1), this method also blocks for
> 5 seconds. I do not know how to implement the async part properly.
>
> I appreciate the link you provided but it will take me sometime to digest
> your sample code. In the mean time, it would be great if you can set me
> straight by explaining why the async method is also blocking.
>
> Thanks,
>
> William
>
>
> =======================
> AvroClient.java - Client code.
> =======================
>
> public class AvroClient {
>     public static void main(String[] args) throws InterruptedException, ExecutionException,
TimeoutException {
>         try {
>             NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
>             Chat.Callback client = SpecificRequestor.getClient(
> Chat.Callback.class, transceiver);
>
>             System.out.println(client.hello()); //This should block for 5 seconds
>             System.out.println("This should print out 5 seconds later");
>
>             final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
>             client.hello(future1); //This should not block.            System.out.println("This
should print out immediately");            System.out.println(future1.get());
>
>             transceiver.close();
>
>         } catch (IOException ex) {
>             System.err.println(ex);
>         }
>     }
> }
>
>
>
> ===========
> ChatImpl.java
> ===========
>
> public class ChatImpl implements Chat {
>     @Override
>     public CharSequence hello() throws AvroRemoteException {        try {
>             Thread.sleep(5000);
>         } catch (InterruptedException ex) {}
>         return new Utf8("Hello");        }}
>
>
>
>
>
>
>
>
>
> On Wed, Feb 1, 2012 at 11:09 AM, James Baldassari <jbaldassari@gmail.com>wrote:
>
>> Hi William,
>>
>> The documentation around the async interface is definitely lacking.
>> There should probably be a separate page on the Avro site for that.  I'll
>> try to find some time to work on it.  In the meantime you can see some
>> examples I put up on github:https://github.com/jbaldassari/Avro-RPC
>>
>> As for the problem you're having, there are no major issues with your
>> code.  The only thing wrong is that the server side (ChatImpl) should
>> implement Chat, not Chat.Callback.  One of the nice things about the async
>> interface is that it only affects the client side of the RPC; the server
>> doesn't have to have any knowledge that it's async.  So the server
>> implements the regular sync interface (Chat), and then the client is free
>> to use either the sync or async version when invoking RPCs.  Does that
>> answer your question?
>>
>> -James
>>
>>
>>
>> On Tue, Jan 31, 2012 at 8:50 PM, William Afendy <carbotex@gmail.com>wrote:
>>
>>> Hi,
>>>
>>> I'm trying to implement Asynchronous calls by using NettyServer
>>> implementation. After digging the source code, I found an example on how to
>>> use NettyServer from TestNettyServerWithCallbacks.java
>>>
>>> When running a few test, I realize that NettyServer never calls
>>> hello(Callback) method, instead it keeps calling the synchronous hello()
>>> method. The client program prints out "Hello" but I'm expecting
>>> "Hello-ASYNC" as a result. I really have no clue what's going on.
>>>
>>> I hope someone can shine some light on me and perhaps point out the
>>> mistake or correct my logic. Below are the codes I use to perform a simple
>>> asynchronous test.
>>>
>>> =======================
>>> AvroClient.java - Client code.
>>> =======================
>>>
>>> public class AvroClient {
>>>     public static void main(String[] args) throws InterruptedException, ExecutionException,
TimeoutException {
>>>         try {
>>>             NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
>>>             Chat.Callback client = SpecificRequestor.getClient(Chat.Callback.class,
transceiver);
>>>
>>>             final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
>>>             client.hello(future1);
>>>
>>>             System.out.println(future1.get());
>>>
>>>             transceiver.close();
>>>
>>>         } catch (IOException ex) {
>>>             System.err.println(ex);
>>>         }
>>>     }
>>> }
>>>
>>> ===========================
>>> AvroNetty.java - The Server Code
>>> ===========================
>>>
>>> public class AvroNetty {
>>>     public static void main(String[] args) {
>>>         Index indexImpl = new AsyncIndexImpl();
>>>         Chat chatImpl = new ChatImpl();
>>>
>>>         Server server = new NettyServer(new SpecificResponder(Chat.class, chatImpl),
new InetSocketAddress(6666));
>>>         server.start();
>>>         System.out.println("Server is listening at port " + server.getPort());
>>>     }
>>> }
>>>
>>> ===========
>>> ChatImpl.java
>>> ===========
>>>
>>> public class ChatImpl implements Chat.Callback {
>>>     @Override
>>>     public void hello(org.apache.avro.ipc.Callback<CharSequence> callback)
throws IOException {
>>>         callback.handleResult("Hello-ASYNC");
>>>     }
>>>
>>>     @Override
>>>     public CharSequence hello() throws AvroRemoteException {
>>>         return new Utf8("Hello");
>>>     }
>>> }
>>>
>>> =============================================
>>> Chat.java - This interface is auto-generated by avro-tool
>>> =============================================
>>>
>>> @SuppressWarnings("all")
>>> public interface Chat {
>>>     public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
>>>     java.lang.CharSequence hello() throws org.apache.avro.AvroRemoteException;
>>>
>>>     @SuppressWarnings("all")
>>>     public interface Callback extends Chat {
>>>         public static final org.apache.avro.Protocol PROTOCOL = avro.test.Chat.PROTOCOL;
>>>         void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence>
callback) throws java.io.IOException;
>>>     }
>>> }
>>>
>>> ====================
>>> Here is the Avro Schema
>>> ====================
>>>
>>> {
>>>   "namespace": "avro.test",
>>>   "protocol": "Chat",
>>>
>>>   "types" : [],
>>>
>>>   "messages": {
>>>       "hello": {
>>>                     "request": [],
>>>                     "response": "string"
>>>       }
>>>   }
>>> }
>>>
>>>
>>> Thanks,
>>>
>>> --
>>> William Afendy
>>>
>>
>>
>
>
> --
> William Afendy
>

Mime
View raw message