avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r818293 - in /hadoop/avro/trunk: ./ src/doc/content/xdocs/ src/java/org/apache/avro/ipc/ src/py/avro/ src/test/java/org/apache/avro/
Date Wed, 23 Sep 2009 22:34:15 GMT
Author: cutting
Date: Wed Sep 23 22:34:14 2009
New Revision: 818293

URL: http://svn.apache.org/viewvc?rev=818293&view=rev
Log:
AVRO-115.  Remove RPC sessions.

Added:
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
    hadoop/avro/trunk/src/py/avro/ipc.py
    hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Sep 23 22:34:14 2009
@@ -4,6 +4,10 @@
 
   INCOMPATIBLE CHANGES
 
+    AVRO-115. Remove RPC's session notion to facilliate the use of
+    stateless transports like UDP and HTTP.  Add a UDP transport.
+    (cutting)
+
   NEW FEATURES
 
   IMPROVEMENTS

Modified: hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/doc/content/xdocs/spec.xml?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/doc/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/src/doc/content/xdocs/spec.xml Wed Sep 23 22:34:14 2009
@@ -687,15 +687,7 @@
 
 	<p>A transport is a system that supports:</p>
 	<ul>
-	  <li><strong>session creation</strong>
-	    <p>A session forms the context under which multiple
-	    messages may be transcieved. A client must establish a
-	    session with a server before any requests may be
-	    processed.</p>
-	  </li>
 	  <li><strong>transmission of request messages</strong>
-	    <p>Once a session has been established, clients may send
-	    servers request messages using that session.</p>
 	  </li>
 	  <li><strong>receipt of corresponding response messages</strong>
 	    <p>Servers will send a response message back to the client
@@ -752,14 +744,15 @@
       <section>
 	<title>Handshake</title>
 
-	<p>RPC sessions are initiated by handshake.  The purpose of
-	the handshake is to ensure that the client and the server have
-	each other's protocol definition, so that the client can
-	correctly deserialize responses, and the server can correctly
-	deserialize requests.  Both clients and servers should
-	maintain a cache of recently seen protocols, so that, in most
-	cases, a handshake will be completed without extra round-trip
-	network exchanges or the transmission of full protocol text.</p>
+	<p>RPC requests and responses are prefixed by handshakes.  The
+	purpose of the handshake is to ensure that the client and the
+	server have each other's protocol definition, so that the
+	client can correctly deserialize responses, and the server can
+	correctly deserialize requests.  Both clients and servers
+	should maintain a cache of recently seen protocols, so that,
+	in most cases, a handshake will be completed without extra
+	round-trip network exchanges or the transmission of full
+	protocol text.</p>
 
 	<p>The handshake process uses the following record schemas:</p>
 
@@ -793,7 +786,7 @@
 	</source>
 
         <ul>
-	  <li>In a new session, a client first sends
+	  <li>A client first prefixes each request with
 	  a <code>HandshakeRequest</code> containing just the hash of
 	  its protocol and of the server's protocol
 	  (<code>clientHash!=null, clientProtocol=null,
@@ -810,19 +803,22 @@
 	      serverHash=null</code> if the client sent the valid hash
 	      of the server's protocol and the server knows what
 	      protocol corresponds to the client's hash. In this case,
-	      the request is complete and the session is
-	      established.</li>
+	      the request is complete and the response data
+	      immediately follows the HandshakeResponse.</li>
 
 	      <li><code>match=CLIENT, serverProtocol!=null,
 	      serverHash!=null</code> if the server has previously
 	      seen the client's protocol, but the client sent an
-	      incorrect hash of the server's protocol. The client must
-	      then re-send the request with the correct server
-	      hash.</li>
+	      incorrect hash of the server's protocol. The request is
+	      complete and the response data immediately follows the
+	      HandshakeResponse. The client must use the returned
+	      protocol to process the response and should also cache
+	      that protocol and its hash for future interactions with
+	      this server.</li>
 
               <li><code>match=NONE, serverProtocol!=null,
 	      serverHash!=null</code> if the server has not previously
-	      seen the client's protocol and the client sent and
+	      seen the client's protocol and the client sent an
 	      incorrect hash of the server's protocol.
 
 	      <p>In this case The client must then re-submit its
@@ -836,19 +832,6 @@
 	  </li>
 	</ul>
 
-	<p>Until a connection is established, call request data sent
-	by the client must be preceded by
-	a <code>HandshakeRequest</code> and call response data
-	returned by the server must be preceded by a
-	<code>HandshakeResponse</code>. A connection is not
-	established until a <code>HandshakeResponse</code> with
-	<code>match=BOTH</code> or <code>match=CLIENT</code> is
-	returned.  In these cases, the call response data immmediately
-	follows
-	the <code>HandShakeResponse</code>. When <code>match=NONE</code>
-	no response call data is sent and the request call data is
-	ignored.</p>
-
 	<p>The <code>meta</code> field is reserved for future
 	handshake enhancements.</p>
 

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java?rev=818293&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramServer.java Wed Sep 23 22:34:14
2009
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A simple datagram-based server implementation. */
+public class DatagramServer extends Thread implements Server {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(DatagramServer.class);
+
+  private final Responder responder;
+  private final DatagramChannel channel;
+  private final Transceiver transceiver;
+
+  public DatagramServer(Responder responder, SocketAddress addr)
+    throws IOException {
+    String name = "DatagramServer on "+addr;
+
+    this.responder = responder;
+
+    this.channel = DatagramChannel.open();
+    channel.socket().bind(addr);
+
+    this.transceiver = new DatagramTransceiver(channel);
+
+    setName(name);
+    setDaemon(true);
+    start();
+  }
+
+  public int getPort() { return channel.socket().getLocalPort(); }
+
+  public void run() {
+    while (true) {
+      try {
+        transceiver.writeBuffers(responder.respond(transceiver.readBuffers()));
+      } catch (ClosedChannelException e) {
+        return;
+      } catch (IOException e) {
+        LOG.warn("unexpected error", e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+  
+  public void close() { this.interrupt(); }
+
+  public static void main(String[] arg) throws Exception {
+    DatagramServer server = new DatagramServer(null, new InetSocketAddress(0));
+    System.out.println("started");
+    server.join();
+  }
+
+}
+

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java?rev=818293&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/DatagramTransceiver.java Wed Sep 23 22:34:14
2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A datagram-based {@link Transceiver} implementation. */
+public class DatagramTransceiver extends Transceiver {
+  private static final Logger LOG
+    = LoggerFactory.getLogger(DatagramTransceiver.class);
+
+  private static final int MAX_SIZE = 16 * 1024;
+
+  private DatagramChannel channel;
+  private SocketAddress remote;
+  private ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
+
+  public String getRemoteName() { return remote.toString(); }
+
+  public DatagramTransceiver(SocketAddress remote) throws IOException {
+    this(DatagramChannel.open());
+    this.remote = remote;
+  }
+
+  public DatagramTransceiver(DatagramChannel channel) {
+    this.channel = channel;
+  }
+
+  public synchronized List<ByteBuffer> readBuffers() throws IOException {
+    buffer.clear();
+    remote = channel.receive(buffer);
+    LOG.info("received from "+remote);
+    buffer.flip();
+    List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+    while (true) {
+      int length = buffer.getInt();
+      if (length == 0) {                          // end of buffers
+        return buffers;
+      }
+      ByteBuffer chunk = buffer.slice();          // use data without copying
+      chunk.limit(length);
+      buffer.position(buffer.position()+length);
+      buffers.add(chunk);
+    }
+  }
+
+  public synchronized void writeBuffers(List<ByteBuffer> buffers)
+    throws IOException {
+    buffer.clear();
+    for (ByteBuffer b : buffers) {
+      buffer.putInt(b.remaining());
+      buffer.put(b);                              // copy data.  sigh.
+    }
+    buffer.putInt(0);
+    buffer.flip();
+    channel.send(buffer, remote);
+    LOG.info("sent to "+remote);
+  }
+
+}

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java Wed Sep 23 22:34:14 2009
@@ -27,49 +27,49 @@
  * This class represents the context of an RPC call or RPC handshake.
  * Designed to provide information to RPC plugin writers,
  * this class encapsulates information about the rpc exchange,
- * including per-session and per-call metadata.
+ * including handshake and call metadata.
  *
  */
 public class RPCContext {
   
-  protected Map<Utf8,ByteBuffer> requestSessionMeta, responseSessionMeta;
+  protected Map<Utf8,ByteBuffer> requestHandshakeMeta, responseHandshakeMeta;
   protected Map<Utf8,ByteBuffer> requestCallMeta, responseCallMeta;
   
   protected Object response;
   protected AvroRemoteException error;
   
   /**
-   * This is an access method for the session state
+   * This is an access method for the handshake state
    * provided by the client to the server.
-   * @return a map representing session state from
+   * @return a map representing handshake state from
    * the client to the server
    */
-  public Map<Utf8,ByteBuffer> requestSessionMeta() {
-    if (requestSessionMeta == null) {
-      requestSessionMeta = new HashMap<Utf8,ByteBuffer>();
+  public Map<Utf8,ByteBuffer> requestHandshakeMeta() {
+    if (requestHandshakeMeta == null) {
+      requestHandshakeMeta = new HashMap<Utf8,ByteBuffer>();
     }
-    return requestSessionMeta;
+    return requestHandshakeMeta;
   }
   
-  void setRequestSessionMeta(Map<Utf8,ByteBuffer> newmeta) {
-    requestSessionMeta = newmeta;
+  void setRequestHandshakeMeta(Map<Utf8,ByteBuffer> newmeta) {
+    requestHandshakeMeta = newmeta;
   }
   
   /**
-   * This is an access method for the session state
+   * This is an access method for the handshake state
    * provided by the server back to the client
-   * @return a map representing session state from
+   * @return a map representing handshake state from
    * the server to the client
    */
-  public Map<Utf8,ByteBuffer> responseSessionMeta() {
-    if (responseSessionMeta == null) {
-      responseSessionMeta = new HashMap<Utf8,ByteBuffer>();
+  public Map<Utf8,ByteBuffer> responseHandshakeMeta() {
+    if (responseHandshakeMeta == null) {
+      responseHandshakeMeta = new HashMap<Utf8,ByteBuffer>();
     }
-    return responseSessionMeta;
+    return responseHandshakeMeta;
   }
   
-  void setResponseSessionMeta(Map<Utf8,ByteBuffer> newmeta) {
-    responseSessionMeta = newmeta;
+  void setResponseHandshakeMeta(Map<Utf8,ByteBuffer> newmeta) {
+    responseHandshakeMeta = newmeta;
   }
   
   /**

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java Wed Sep 23 22:34:14 2009
@@ -27,20 +27,20 @@
   
   /**
    * Called on the client before the initial RPC handshake to
-   * setup any per-session metadata for this plugin
-   * @param context the per-sesion rpc context
+   * setup any handshake metadata for this plugin
+   * @param context the handshake rpc context
    */
   public void clientStartConnect(RPCContext context) { }
   
   /**
    * Called on the server during the RPC handshake
-   * @param context the per-sesion rpc context
+   * @param context the handshake rpc context
    */
   public void serverConnecting(RPCContext context) { }
   
   /**
    * Called on the client after the initial RPC handshake
-   * @param context the per-sesion rpc context
+   * @param context the handshake rpc context
    */
   public void clientFinishConnect(RPCContext context) { }
   

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Wed Sep 23 22:34:14 2009
@@ -55,7 +55,7 @@
 
   private Protocol local;
   private Protocol remote;
-  private boolean established, sendLocalText;
+  private boolean sendLocalText;
   private Transceiver transceiver;
   
   protected List<RPCPlugin> rpcMetaPlugins;
@@ -91,8 +91,7 @@
       ByteBufferOutputStream bbo = new ByteBufferOutputStream();
       Encoder out = new BinaryEncoder(bbo);
 
-      if (!established)                           // if not established
-        writeHandshake(out);                      // prepend handshake
+      writeHandshake(out);                      // prepend handshake
 
       // use local protocol to write request
       m = getLocal().getMessages().get(messageName);
@@ -112,9 +111,7 @@
       
       ByteBufferInputStream bbi = new ByteBufferInputStream(response);
       in = new BinaryDecoder(bbi);
-      if (!established)                           // if not established
-        readHandshake(in);                        // process handshake
-    } while (!established);
+    } while (!readHandshake(in));
 
     // use remote protocol to read response
     m = getRemote().getMessages().get(messageName);
@@ -172,13 +169,14 @@
     for (RPCPlugin plugin : rpcMetaPlugins) {
       plugin.clientStartConnect(context);
     }
-    handshake.meta = context.requestSessionMeta();
+    handshake.meta = context.requestHandshakeMeta();
     
     HANDSHAKE_WRITER.write(handshake, out);
   }
 
   @SuppressWarnings("unchecked")
-  private void readHandshake(Decoder in) throws IOException {
+  private boolean readHandshake(Decoder in) throws IOException {
+    boolean established = false;
     HandshakeResponse handshake =
       (HandshakeResponse)HANDSHAKE_READER.read(null, in);
     switch (handshake.match) {
@@ -201,12 +199,13 @@
     
     RPCContext context = new RPCContext();
     if (handshake.meta != null) {
-      context.setResponseSessionMeta((Map<Utf8, ByteBuffer>) handshake.meta);
+      context.setResponseHandshakeMeta((Map<Utf8, ByteBuffer>) handshake.meta);
     }
       
     for (RPCPlugin plugin : rpcMetaPlugins) {
       plugin.clientFinishConnect(context);
     }
+    return established;
   }
 
   private void setRemote(HandshakeResponse handshake) {

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Wed Sep 23 22:34:14 2009
@@ -26,7 +26,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.WeakHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,8 +55,6 @@
   private static final GenericDatumWriter<Map<Utf8,ByteBuffer>> META_WRITER =
     new GenericDatumWriter<Map<Utf8,ByteBuffer>>(META);
 
-  private Map<Transceiver,Protocol> remotes
-    = Collections.synchronizedMap(new WeakHashMap<Transceiver,Protocol>());
   private Map<MD5,Protocol> protocols
     = Collections.synchronizedMap(new HashMap<MD5,Protocol>());
 
@@ -87,18 +84,14 @@
 
   /** Called by a server to deserialize a request, compute and serialize
    * a response or error. */
-  public List<ByteBuffer> respond(Transceiver transceiver) throws IOException {
-    ByteBufferInputStream bbi =
-      new ByteBufferInputStream(transceiver.readBuffers());
-    
-    Decoder in = new BinaryDecoder(bbi);
-    ByteBufferOutputStream bbo =
-      new ByteBufferOutputStream();
+  public List<ByteBuffer> respond(List<ByteBuffer> buffers) throws IOException
{
+    Decoder in = new BinaryDecoder(new ByteBufferInputStream(buffers));
+    ByteBufferOutputStream bbo = new ByteBufferOutputStream();
     Encoder out = new BinaryEncoder(bbo);
     AvroRemoteException error = null;
     RPCContext context = new RPCContext();
     try {
-      Protocol remote = handshake(transceiver, in, out);
+      Protocol remote = handshake(in, out);
       if (remote == null)                        // handshake failed
         return bbo.getBufferList();
 
@@ -163,20 +156,14 @@
     new SpecificDatumReader(HandshakeRequest._SCHEMA);
 
   @SuppressWarnings("unchecked")
-  private Protocol handshake(Transceiver transceiver,
-                             Decoder in, Encoder out)
+  private Protocol handshake(Decoder in, Encoder out)
     throws IOException {
-    Protocol remote = remotes.get(transceiver);
-    if (remote != null) return remote;            // already established
-      
     HandshakeRequest request = (HandshakeRequest)handshakeReader.read(null, in);
-    remote = protocols.get(request.clientHash);
+    Protocol remote = protocols.get(request.clientHash);
     if (remote == null && request.clientProtocol != null) {
       remote = Protocol.parse(request.clientProtocol.toString());
       protocols.put(request.clientHash, remote);
     }
-    if (remote != null)
-      remotes.put(transceiver, remote);
     HandshakeResponse response = new HandshakeResponse();
     if (localHash.equals(request.serverHash)) {
       response.match =
@@ -191,12 +178,12 @@
     }
     
     RPCContext context = new RPCContext();
-    context.setRequestSessionMeta((Map<Utf8, ByteBuffer>) request.meta);
+    context.setRequestHandshakeMeta((Map<Utf8, ByteBuffer>) request.meta);
     
     for (RPCPlugin plugin : rpcMetaPlugins) {
       plugin.serverConnecting(context);
     }
-    response.meta = context.responseSessionMeta();
+    response.meta = context.responseHandshakeMeta();
     
     handshakeWriter.write(response, out);
     return remote;

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java?rev=818293&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Server.java Wed Sep 23 22:34:14 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+/** A server listening on a port. */
+public interface Server {
+  /** The port this server runs on. */
+  int getPort();
+
+  /** Stop this server. */
+  void close();
+  
+}

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketServer.java Wed Sep 23 22:34:14 2009
@@ -29,7 +29,7 @@
 import org.slf4j.LoggerFactory;
 
 /** A simple socket-based server implementation. */
-public class SocketServer extends Thread {
+public class SocketServer extends Thread implements Server {
   private static final Logger LOG = LoggerFactory.getLogger(SocketServer.class);
 
   private Responder responder;
@@ -88,7 +88,7 @@
       try {
         try {
           while (true) {
-            writeBuffers(responder.respond(this));
+            writeBuffers(responder.respond(readBuffers()));
           }
         } catch (ClosedChannelException e) {
           return;

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/SocketTransceiver.java Wed Sep 23 22:34:14
2009
@@ -36,17 +36,17 @@
   private SocketChannel channel;
   private ByteBuffer header = ByteBuffer.allocate(4);
   
-  public String getRemoteName() {
-    return channel.socket().getRemoteSocketAddress().toString();
-  }
-
   public SocketTransceiver(SocketAddress address) throws IOException {
     this(SocketChannel.open(address));
   }
 
   public SocketTransceiver(SocketChannel channel) {
     this.channel = channel;
-    LOG.info("open to "+channel.socket().getRemoteSocketAddress());
+    LOG.info("open to "+getRemoteName());
+  }
+
+  public String getRemoteName() {
+    return channel.socket().getRemoteSocketAddress().toString();
   }
 
   public synchronized List<ByteBuffer> readBuffers() throws IOException {
@@ -87,8 +87,10 @@
   }
 
   public void close() throws IOException {
-    LOG.info("closing to "+channel.socket().getRemoteSocketAddress());
-    channel.close();
+    if (channel.isOpen()) {
+      LOG.info("closing to "+getRemoteName());
+      channel.close();
+    }
   }
 
 }

Modified: hadoop/avro/trunk/src/py/avro/ipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/ipc.py?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/ipc.py (original)
+++ hadoop/avro/trunk/src/py/avro/ipc.py Wed Sep 23 22:34:14 2009
@@ -90,7 +90,6 @@
   def __init__(self, localproto, transceiver):
     self.__localproto = localproto
     self.__transceiver = transceiver
-    self.__established = False
     self.__sendlocaltext = False
     self.__remoteproto = None
 
@@ -106,12 +105,10 @@
   def request(self, msgname, req):
     """Writes a request message and reads a response or error message."""
     processed = False
-    while not self.__established or not processed:
-      processed = True
+    while not processed:
       buf = cStringIO.StringIO()
       encoder = io.Encoder(buf)
-      if not self.__established:
-        self.__writehandshake(encoder)
+      self.__writehandshake(encoder)
       requestmeta = dict()
       _META_WRITER.write(requestmeta, encoder)
       m = self.__localproto.getmessages().get(msgname)
@@ -121,8 +118,7 @@
       self.writerequest(m.getrequest(), req, encoder)
       response = self.__transceiver.transceive(buf.getvalue())
       decoder = io.Decoder(cStringIO.StringIO(response))
-      if not self.__established:
-        self.__readhandshake(decoder)
+      processed = self.__readhandshake(decoder)
     responsemeta = _META_READER.read(decoder)
     m = self.getremote().getmessages().get(msgname)
     if m is None:
@@ -154,13 +150,14 @@
            self.__transceiver.getremotename().__str__() + " is " +
            handshake.match.__str__())
     if handshake.match == _HANDSHAKE_MATCH_BOTH:
-      self.__established = True
+      return True
     elif handshake.match == _HANDSHAKE_MATCH_CLIENT:
       self.__setremote(handshake)
-      self.__established = True
+      return True
     elif handshake.match == _HANDSHAKE_MATCH_NONE:
       self.__setremote(handshake)
       self.__sendlocaltext = True
+      return False
     else:
       raise schema.AvroException("Unexpected match: "+handshake.match.__str__())
 
@@ -188,7 +185,6 @@
 
   def __init__(self, localproto):
     self.__localproto = localproto
-    self.__remotes = weakref.WeakKeyDictionary()
     self.__protocols = dict()
     self.__localhash = self.__localproto.getMD5()
     self.__protocols[self.__localhash] = self.__localproto
@@ -248,16 +244,11 @@
 
 
   def __handshake(self, transceiver, decoder, encoder):
-    remoteproto = self.__remotes.get(transceiver)
-    if remoteproto != None:
-      return remoteproto #already established
     request = _HANDSHAKE_RESPONDER_READER.read(decoder)
     remoteproto = self.__protocols.get(request.clientHash)
     if remoteproto is None and request.clientProtocol is not None:
       remoteproto = protocol.parse(request.clientProtocol)
       self.__protocols[request.clientHash] = remoteproto
-    if remoteproto is not None:
-      self.__remotes[transceiver] = remoteproto
     response = _HandshakeResponse()
     
     if self.__localhash == request.serverHash:

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java Wed Sep 23 22:34:14
2009
@@ -27,13 +27,11 @@
 import org.apache.avro.util.Utf8;
 
 /**
- * An implementation of an RPC metadata plugin API
- * designed for unit testing.  This plugin tests
- * both session and per-call state by passing
- * a string as per-call metadata, slowly building it
- * up at each instrumentation point, testing it as
- * it goes.  Finally, after the call or handshake is
- * complete, the constructed string is tested.
+ * An implementation of an RPC metadata plugin API designed for unit testing.
+ * This plugin tests handshake and call state by passing a string as metadata,
+ * slowly building it up at each instrumentation point, testing it as it goes.
+ * Finally, after the call or handshake is complete, the constructed string is
+ * tested.
  */
 public final class RPCMetaTestPlugin extends RPCPlugin {
   
@@ -46,18 +44,18 @@
   @Override
   public void clientStartConnect(RPCContext context) {
     ByteBuffer buf = ByteBuffer.wrap("ap".getBytes());
-    context.requestSessionMeta().put(key, buf);
+    context.requestHandshakeMeta().put(key, buf);
   }
   
   @Override
   public void serverConnecting(RPCContext context) {
     
-    Assert.assertNotNull(context.requestSessionMeta());
-    Assert.assertNotNull(context.responseSessionMeta());
+    Assert.assertNotNull(context.requestHandshakeMeta());
+    Assert.assertNotNull(context.responseHandshakeMeta());
     
-    if (!context.requestSessionMeta().containsKey(key)) return;
+    if (!context.requestHandshakeMeta().containsKey(key)) return;
     
-    ByteBuffer buf = context.requestSessionMeta().get(key);
+    ByteBuffer buf = context.requestHandshakeMeta().get(key);
     Assert.assertNotNull(buf);
     Assert.assertNotNull(buf.array());
     
@@ -67,18 +65,18 @@
     
     buf = ByteBuffer.wrap((partialstr + "ac").getBytes());
     Assert.assertTrue(buf.remaining() > 0);
-    context.responseSessionMeta().put(key, buf);
+    context.responseHandshakeMeta().put(key, buf);
   }
   
   @Override
   public void clientFinishConnect(RPCContext context) {
-    Map<Utf8,ByteBuffer> sessionMeta = context.responseSessionMeta();
+    Map<Utf8,ByteBuffer> handshakeMeta = context.responseHandshakeMeta();
     
-    Assert.assertNotNull(sessionMeta);
+    Assert.assertNotNull(handshakeMeta);
     
-    if (!sessionMeta.containsKey(key)) return;
+    if (!handshakeMeta.containsKey(key)) return;
     
-    ByteBuffer buf = sessionMeta.get(key);
+    ByteBuffer buf = handshakeMeta.get(key);
     Assert.assertNotNull(buf);
     Assert.assertNotNull(buf.array());
     
@@ -88,9 +86,9 @@
     
     buf = ByteBuffer.wrap((partialstr + "he").getBytes());
     Assert.assertTrue(buf.remaining() > 0);
-    sessionMeta.put(key, buf);
+    handshakeMeta.put(key, buf);
     
-    checkRPCMetaMap(sessionMeta);
+    checkRPCMetaMap(handshakeMeta);
   }
   
   @Override

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java?rev=818293&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolDatagram.java Wed Sep 23 22:34:14
2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.util.Random;
+import org.apache.avro.ipc.DatagramServer;
+import org.apache.avro.ipc.DatagramTransceiver;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.junit.Before;
+
+import java.net.InetSocketAddress;
+
+public class TestProtocolDatagram extends TestProtocolSpecific {
+
+  @Before
+  public void testStartServer() throws Exception {
+    server =
+      new DatagramServer(new SpecificResponder(Simple.class, new TestImpl()),
+                         new InetSocketAddress("localhost",
+                                               new Random().nextInt(10000)+10000));
+    client = new DatagramTransceiver(new InetSocketAddress("localhost", server.getPort()));
+    proxy = (Simple)SpecificRequestor.getClient(Simple.class, client);
+  }
+
+}

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=818293&r1=818292&r2=818293&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java Wed Sep 23 22:34:14
2009
@@ -18,6 +18,7 @@
 package org.apache.avro;
 
 import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.SocketServer;
 import org.apache.avro.ipc.SocketTransceiver;
 import org.apache.avro.ipc.Transceiver;
@@ -61,7 +62,7 @@
     }
   }
 
-  protected static SocketServer server;
+  protected static Server server;
   protected static Transceiver client;
   protected static Simple proxy;
 



Mime
View raw message