avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r959787 - in /avro/trunk: ./ lang/java/ lang/java/src/java/org/apache/avro/ipc/ lang/java/src/test/java/org/apache/avro/ipc/ share/test/schemas/
Date Thu, 01 Jul 2010 21:25:02 GMT
Author: cutting
Date: Thu Jul  1 21:25:01 2010
New Revision: 959787

URL: http://svn.apache.org/viewvc?rev=959787&view=rev
Log:
AVRO-405. Java: Add Netty-based RPC transceiver and server implementation.  Contributed by
Harry Wang.

Added:
    avro/trunk/lang/java/ivysettings-jboss-repos.xml
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java
    avro/trunk/share/test/schemas/mail.avpr
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/build.xml
    avro/trunk/lang/java/ivy.xml

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=959787&r1=959786&r2=959787&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Jul  1 21:25:01 2010
@@ -25,6 +25,9 @@ Avro 1.4.0 (unreleased)
     AVRO-578. Java: add payload data to RPC context for use by
     plugins.  (Patrick Wendell via cutting)
 
+    AVRO-405: Java: Add Netty-based RPC transceiver and server
+    implementation. (Harry Wang via cutting)
+
   IMPROVEMENTS
     AVRO-584. Update Histogram for Stats Plugin
     (Patrick Wendell via philz)

Modified: avro/trunk/lang/java/build.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/build.xml?rev=959787&r1=959786&r2=959787&view=diff
==============================================================================
--- avro/trunk/lang/java/build.xml (original)
+++ avro/trunk/lang/java/build.xml Thu Jul  1 21:25:01 2010
@@ -184,27 +184,38 @@
     <!-- ensure that ivy taskdef is only run once, otw ant will error -->
     <property name="ivy.initialized" value="true"/>
   </target>
+  
+  <target name="ivy-retrieve-netty">
+    <ivy:settings id="ivy.jboss.settings" 
+        file="${basedir}/ivysettings-jboss-repos.xml" />
+    <ivy:retrieve type="jar" conf="default"
+        settingsRef="ivy.jboss.settings"
+        pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
+    <ivy:retrieve type="bundle" conf="default"
+        settingsRef="ivy.jboss.settings"
+        pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
+  </target>
 
-  <target name="ivy-retrieve" depends="init,ivy-init">
+  <target name="ivy-retrieve" depends="init,ivy-init,ivy-retrieve-netty">
     <ivy:retrieve type="jar" conf="default"
 		  pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
   </target>
 
-  <target name="ivy-retrieve-tools" depends="init,ivy-init">
+  <target name="ivy-retrieve-tools" depends="init,ivy-init,ivy-retrieve-netty">
     <!-- Place in separate directory, since these artificats will
          be packaged in the tools jar. -->
     <ivy:retrieve type="jar" conf="tools"
 		  pattern="${ivy.lib}/tools/[artifact]-[revision].[ext]"/>
   </target>
 
-  <target name="ivy-retrieve-test" depends="init,ivy-init">
+  <target name="ivy-retrieve-test" depends="init,ivy-init,ivy-retrieve-netty">
     <ivy:retrieve type="jar" conf="test"
 		  pattern="${ivy.test.lib}/[artifact]-[revision].[ext]"/>
   </target>
 
-  <target name="ivy-retrieve-build" depends="init,ivy-init">
+  <target name="ivy-retrieve-build" depends="init,ivy-init,ivy-retrieve-netty">
     <ivy:retrieve type="jar" conf="build"
-		  pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
+          pattern="${ivy.lib}/[artifact]-[revision].[ext]"/>
   </target>
 
   <macrodef name="java-compiler">

Modified: avro/trunk/lang/java/ivy.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ivy.xml?rev=959787&r1=959786&r2=959787&view=diff
==============================================================================
--- avro/trunk/lang/java/ivy.xml (original)
+++ avro/trunk/lang/java/ivy.xml Thu Jul  1 21:25:01 2010
@@ -65,6 +65,7 @@
                 conf="build->default;test->default;tools->default" transitive="false"/>
     <dependency org="commons-httpclient" name="commons-httpclient" rev="3.0.1"
 		conf="test->default;tools->default"/>
+    <dependency org="org.jboss.netty" name="netty" rev="3.2.1.Final"/>
   </dependencies>
 
 </ivy-module>

Added: avro/trunk/lang/java/ivysettings-jboss-repos.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ivysettings-jboss-repos.xml?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/ivysettings-jboss-repos.xml (added)
+++ avro/trunk/lang/java/ivysettings-jboss-repos.xml Thu Jul  1 21:25:01 2010
@@ -0,0 +1,38 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.    
+-->
+<ivysettings>
+    <property name="repo.maven.org"
+        value="http://repo2.maven.org/maven2/" override="false"/>
+    <property name="repo.jboss.org"
+        value="http://repository.jboss.org/nexus/content/groups/public/" override="false"/>
+    <property name="maven2.pattern"
+        value="[organisation]/[module]/[revision]/[module]-[revision]"/>
+    <property name="maven2.pattern.ext" value="${maven2.pattern}.[ext]"/>
+    <settings defaultResolver="default"/>
+    <resolvers>
+        <ibiblio name="maven2" root="${repo.maven.org}"
+            pattern="${maven2.pattern.ext}" m2compatible="true"/>
+        <ibiblio name="jboss-maven2" root="${repo.jboss.org}"
+            pattern="${maven2.pattern.ext}" m2compatible="true"/>
+        <chain name="default" dual="true">
+            <resolver ref="maven2"/>
+            <resolver ref="jboss-maven2"/>
+        </chain>
+    </resolvers>    
+</ivysettings>
\ No newline at end of file

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyServer.java Thu Jul  1 21:25:01
2010
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import org.apache.avro.ipc.NettyTransportCodec.NettyDataPack;
+import org.apache.avro.ipc.NettyTransportCodec.NettyFrameDecoder;
+import org.apache.avro.ipc.NettyTransportCodec.NettyFrameEncoder;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Netty-based RPC {@link Server} implementation.
+ */
+public class NettyServer extends Thread implements Server {
+  private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class
+      .getName());
+
+  private Responder responder;
+  private InetSocketAddress addr;
+
+  private Channel serverChannel;
+  private ChannelGroup allChannels = new DefaultChannelGroup(
+      "avro-netty-server");
+  private ChannelFactory channelFactory;
+
+  public NettyServer(Responder responder, InetSocketAddress addr) {
+    this.responder = responder;
+    this.addr = addr;
+
+    setName("AvroNettyServer on " + addr);
+    setDaemon(true);
+    start();
+  }
+
+  @Override
+  public void close() {
+    ChannelGroupFuture future = allChannels.close();
+    future.awaitUninterruptibly();
+    channelFactory.releaseExternalResources();
+  }
+
+  @Override
+  public int getPort() {
+    return ((InetSocketAddress) serverChannel.getLocalAddress()).getPort();
+  }
+
+  @Override
+  public void run() {
+    channelFactory = new NioServerSocketChannelFactory(Executors
+        .newCachedThreadPool(), Executors.newCachedThreadPool());
+    ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline p = Channels.pipeline();
+        p.addLast("frameDecoder", new NettyFrameDecoder());
+        p.addLast("frameEncoder", new NettyFrameEncoder());
+        p.addLast("handler", new NettyServerAvroHandler());
+        return p;
+      }
+    });
+    serverChannel = bootstrap.bind(addr);
+    allChannels.add(serverChannel);
+  }
+
+  /**
+   * Avro server handler for the Netty transport 
+   */
+  class NettyServerAvroHandler extends SimpleChannelUpstreamHandler {
+
+    @Override
+    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
+        throws Exception {
+      if (e instanceof ChannelStateEvent) {
+        LOG.info(e.toString());
+      }
+      super.handleUpstream(ctx, e);
+    }
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
+        throws Exception {
+      allChannels.add(e.getChannel());
+      super.channelOpen(ctx, e);
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+      try {
+        NettyDataPack dataPack = (NettyDataPack) e.getMessage();
+        List<ByteBuffer> req = dataPack.getDatas();
+        List<ByteBuffer> res = responder.respond(req);
+        dataPack.setDatas(res);
+        e.getChannel().write(dataPack);
+      } catch (IOException ex) {
+        LOG.warn("unexpect error");
+      } finally {
+        e.getChannel().close();
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+      LOG.warn("Unexpected exception from downstream.", e.getCause());
+      e.getChannel().close();
+    }
+
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransceiver.java Thu Jul  1 21:25:01
2010
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.NettyTransportCodec.NettyDataPack;
+import org.apache.avro.ipc.NettyTransportCodec.NettyFrameDecoder;
+import org.apache.avro.ipc.NettyTransportCodec.NettyFrameEncoder;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Netty-based {@link Transceiver} implementation.
+ */
+public class NettyTransceiver extends Transceiver {
+  private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
+      .getName());
+
+  private ChannelFactory channelFactory;
+  private Channel channel;
+  
+  private AtomicInteger serialGenerator = new AtomicInteger(0);
+  private Map<Integer, CallFuture> requests = 
+    new ConcurrentHashMap<Integer, CallFuture>();
+  
+  private Protocol remote;
+  
+  public NettyTransceiver(InetSocketAddress addr) {
+    // Set up.
+    channelFactory = new NioClientSocketChannelFactory(Executors
+        .newCachedThreadPool(), Executors.newCachedThreadPool());
+    ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+
+    // Configure the event pipeline factory.
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline p = Channels.pipeline();
+        p.addLast("frameDecoder", new NettyFrameDecoder());
+        p.addLast("frameEncoder", new NettyFrameEncoder());
+        p.addLast("handler", new NettyClientAvroHandler());
+        return p;
+      }
+    });
+
+    bootstrap.setOption("tcpNoDelay", true);
+
+    // Make a new connection.
+    ChannelFuture channelFuture = bootstrap.connect(addr);
+    channelFuture.awaitUninterruptibly();
+    if (!channelFuture.isSuccess()) {
+      channelFuture.getCause().printStackTrace();
+      throw new RuntimeException(channelFuture.getCause());
+    }
+    channel = channelFuture.getChannel();
+  }
+
+  public void close() {
+    // Close the connection.
+    channel.close().awaitUninterruptibly();
+    // Shut down all thread pools to exit.
+    channelFactory.releaseExternalResources();
+  }
+
+  @Override
+  public String getRemoteName() {
+    return channel.getRemoteAddress().toString();
+  }
+
+  /**
+   * Override as non-synchronized method because the method is thread safe.
+   */
+  @Override
+  public List<ByteBuffer> transceive(List<ByteBuffer> request)
+      throws IOException {
+    int serial = serialGenerator.incrementAndGet();
+    NettyDataPack dataPack = new NettyDataPack(serial, request);
+    CallFuture callFuture = new CallFuture();
+    requests.put(serial, callFuture);
+    channel.write(dataPack);
+    try {
+      return callFuture.get();
+    } catch (InterruptedException e) {
+      LOG.warn("failed to get the response", e);
+      return null;
+    } catch (ExecutionException e) {
+      LOG.warn("failed to get the response", e);
+      return null;
+    } finally {
+      requests.remove(serial);
+    }
+  }
+
+  @Override
+  public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<ByteBuffer> readBuffers() throws IOException {
+    throw new UnsupportedOperationException();  
+  }
+  
+  @Override
+  public Protocol getRemote() {
+    return remote;
+  }
+
+  @Override
+  public boolean isConnected() {
+    return remote!=null;
+  }
+
+  @Override
+  public void setRemote(Protocol protocol) {
+    this.remote = protocol;
+  }
+
+  /**
+   * Future class for a RPC call
+   */
+  class CallFuture implements Future<List<ByteBuffer>>{
+    private Semaphore sem = new Semaphore(0);
+    private List<ByteBuffer> response = null;
+    
+    public void setResponse(List<ByteBuffer> response) {
+      this.response = response;
+      sem.release();
+    }
+    
+    public void releaseSemphore() {
+      sem.release();
+    }
+
+    public List<ByteBuffer> getResponse() {
+      return response;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return false;
+    }
+
+    @Override
+    public List<ByteBuffer> get() throws InterruptedException,
+        ExecutionException {
+      sem.acquire();
+      return response;
+    }
+
+    @Override
+    public List<ByteBuffer> get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      if (sem.tryAcquire(timeout, unit)) {
+        return response;
+      } else {
+        throw new TimeoutException();
+      }
+    }
+
+    @Override
+    public boolean isDone() {
+      return sem.availablePermits()>0;
+    }
+    
+  }
+
+  /**
+   * Avro client handler for the Netty transport 
+   */
+  class NettyClientAvroHandler extends SimpleChannelUpstreamHandler {
+
+    @Override
+    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
+        throws Exception {
+      if (e instanceof ChannelStateEvent) {
+        LOG.info(e.toString());
+      }
+      super.handleUpstream(ctx, e);
+    }
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
+        throws Exception {
+      // channel = e.getChannel();
+      super.channelOpen(ctx, e);
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
+      NettyDataPack dataPack = (NettyDataPack)e.getMessage();
+      CallFuture callFuture = requests.get(dataPack.getSerial());
+      if (callFuture==null) {
+        throw new RuntimeException("Missing previous call info");
+      }
+      callFuture.setResponse(dataPack.getDatas());
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+      LOG.warn("Unexpected exception from downstream.", e.getCause());
+      e.getChannel().close();
+      // let the blocking waiting exit
+      Iterator<CallFuture> it = requests.values().iterator();
+      while(it.hasNext()) {
+        it.next().releaseSemphore();
+        it.remove();
+      }
+      
+    }
+
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/NettyTransportCodec.java Thu Jul  1
21:25:01 2010
@@ -0,0 +1,165 @@
+package org.apache.avro.ipc;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+/**
+ * Data structure, encoder and decoder classes for the Netty transport. 
+ */
+public class NettyTransportCodec {
+  /**
+   * Transport protocol data structure when using Netty. 
+   */
+  public static class NettyDataPack {
+    private int serial; // to track each call in client side
+    private List<ByteBuffer> datas;
+
+    public NettyDataPack() {}
+    
+    public NettyDataPack(int serial, List<ByteBuffer> datas) {
+      this.serial = serial;
+      this.datas = datas;
+    }
+    
+    public void setSerial(int serial) {
+      this.serial = serial;
+    }
+
+    public int getSerial() {
+      return serial;
+    }
+    
+    public void setDatas(List<ByteBuffer> datas) {
+      this.datas = datas;
+    }
+
+    public List<ByteBuffer> getDatas() {
+      return datas;
+    }
+    
+  }
+  
+  /**
+   * Protocol encoder which converts NettyDataPack which contains the 
+   * Responder's output List&lt;ByteBuffer&gt; to ChannelBuffer needed 
+   * by Netty.
+   */
+  public static class NettyFrameEncoder extends OneToOneEncoder {
+
+    /**
+     * encode msg to ChannelBuffer
+     * @param msg NettyDataPack from 
+     *            NettyServerAvroHandler/NettyClientAvroHandler in the pipeline
+     * @return encoded ChannelBuffer
+     */
+    @Override
+    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        throws Exception {
+      NettyDataPack dataPack = (NettyDataPack)msg;
+      List<ByteBuffer> origs = dataPack.getDatas();
+      List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(origs.size() * 2 + 1);
+      bbs.add(getPackHeader(dataPack)); // prepend a pack header including serial number
and list size
+      for (ByteBuffer b : origs) {
+        bbs.add(getLengthHeader(b)); // for each buffer prepend length field
+        bbs.add(b);
+      }
+
+      return ChannelBuffers
+          .wrappedBuffer(bbs.toArray(new ByteBuffer[bbs.size()]));
+    }
+    
+    private ByteBuffer getPackHeader(NettyDataPack dataPack) {
+      ByteBuffer header = ByteBuffer.allocate(8);
+      header.putInt(dataPack.getSerial());
+      header.putInt(dataPack.getDatas().size());
+      header.flip();
+      return header;
+    }
+
+    private ByteBuffer getLengthHeader(ByteBuffer buf) {
+      ByteBuffer header = ByteBuffer.allocate(4);
+      header.putInt(buf.limit());
+      header.flip();
+      return header;
+    }
+  }
+
+  /**
+   * Protocol decoder which converts Netty's ChannelBuffer to 
+   * NettyDataPack which contains a List&lt;ByteBuffer&gt; needed 
+   * by Avro Responder.
+   */
+  public static class NettyFrameDecoder extends FrameDecoder {
+    private boolean packHeaderRead = false;
+    private int listSize;
+    private NettyDataPack dataPack;
+    
+    /**
+     * decode buffer to NettyDataPack
+     */
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, Channel channel,
+        ChannelBuffer buffer) throws Exception {
+
+      if (!packHeaderRead) {
+        if (decodePackHeader(ctx, channel, buffer)) {
+          packHeaderRead = true;
+        }
+        return null;
+      } else {
+        if (decodePackBody(ctx, channel, buffer)) {
+          packHeaderRead = false; // reset state
+          return dataPack;
+        } else {
+          return null;
+        }
+      }
+      
+    }
+    
+    private boolean decodePackHeader(ChannelHandlerContext ctx, Channel channel,
+        ChannelBuffer buffer) throws Exception {
+      if (buffer.readableBytes()<8) {
+        return false;
+      }
+
+      int serial = buffer.readInt();
+      listSize = buffer.readInt();
+      dataPack = new NettyDataPack(serial, new ArrayList<ByteBuffer>(listSize));
+      return true;
+    }
+    
+    private boolean decodePackBody(ChannelHandlerContext ctx, Channel channel,
+        ChannelBuffer buffer) throws Exception {
+      if (buffer.readableBytes() < 4) {
+        return false;
+      }
+
+      buffer.markReaderIndex();
+      
+      int length = buffer.readInt();
+
+      if (buffer.readableBytes() < length) {
+        buffer.resetReaderIndex();
+        return false;
+      }
+
+      ByteBuffer bb = ByteBuffer.allocate(length);
+      buffer.readBytes(bb);
+      bb.flip();
+      dataPack.getDatas().add(bb);
+      
+      return dataPack.getDatas().size()==listSize;
+    }
+
+  }
+  
+}

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=959787&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestNettyServer.java Thu Jul  1
21:25:01 2010
@@ -0,0 +1,58 @@
+package org.apache.avro.ipc;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.test.Mail;
+import org.apache.avro.test.Message;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+public class TestNettyServer {
+
+  public static class MailImpl implements Mail {
+    // in this simple example just return details of the message
+    public Utf8 send(Message message) {
+      return new Utf8("Sent message to [" + message.to.toString() + "] from ["
+          + message.from.toString() + "] with body [" + message.body.toString()
+          + "]");
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    // start server
+    System.out.println("starting server...");
+    Responder responder = new SpecificResponder(Mail.class, new MailImpl());
+    Server server = new NettyServer(responder, new InetSocketAddress(0));
+    Thread.sleep(1000); // waiting for server startup
+
+    int serverPort = server.getPort();
+    System.out.println("server port : " + serverPort);
+
+    // client
+    Transceiver transceiver = new NettyTransceiver(new InetSocketAddress(
+        serverPort));
+    Mail proxy = (Mail) SpecificRequestor.getClient(Mail.class, transceiver);
+
+    Message msg = new Message();
+    msg.to = new Utf8("wife");
+    msg.from = new Utf8("husband");
+    msg.body = new Utf8("I love you!");
+
+    try {
+      Utf8 result = proxy.send(msg);
+      System.out.println("Result: " + result);
+      Assert.assertEquals(
+          "Sent message to [wife] from [husband] with body [I love you!]",
+          result.toString());
+    } finally {
+      transceiver.close();
+      server.close();
+    }
+  }
+
+}

Added: avro/trunk/share/test/schemas/mail.avpr
URL: http://svn.apache.org/viewvc/avro/trunk/share/test/schemas/mail.avpr?rev=959787&view=auto
==============================================================================
--- avro/trunk/share/test/schemas/mail.avpr (added)
+++ avro/trunk/share/test/schemas/mail.avpr Thu Jul  1 21:25:01 2010
@@ -0,0 +1,20 @@
+{"namespace": "org.apache.avro.test",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     }
+ }
+}
\ No newline at end of file



Mime
View raw message