avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Doug Cutting <cutt...@apache.org>
Subject Re: Record extensions?
Date Thu, 14 Jun 2012 17:10:05 GMT
On Tue, Jun 12, 2012 at 6:09 PM, Christophe Taton <taton@wibidata.com> wrote:
> In practice, I have a bunch of independent records, each of them carrying at
> most one "extension field".
> I was especially hoping there would be a way to avoid serializing an
> "extension" record twice (once from the record object into a bytes field,
> and then a second time as a bytes field into the destination output
> stream). Ideally, such an extension field should not require its content to
> be bytes, but should accept any record object, so that it is encoded only
> once.
> As I understand it, Avro does not allow me to do this right now. Is this
> correct?

I think that can be done too if the schema for the extension field is
known when the client opens a connection.  This is a bit like
org.apache.avro.mapred.Pair<K,V>, where in different files K and V can
have different schemas.  You'd construct a GenericRequestor passing a
protocol that incorporates the particular extensions in use for that
session.  The server would then subclass GenericResponder overriding
getLocal() to return the value of getRemote(), so that the remote
protocol that contains the extensions is used to both read and write
data.  (You could also make this work with specific or reflect.)  This
way a different protocol would be used for each client session.  The
server's implementation of Responder#respond() would have to be
implemented to handle these variations.

The patch below would be required to make sure that Responder always
uses the value of getLocal() so that you can meaningfully override it.
 If this sounds useful we can file a Jira.


Index: lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
--- lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java	(revision
+++ lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java	(working
@@ -65,14 +65,11 @@
     = new ConcurrentHashMap<MD5,Protocol>();

   private final Protocol local;
-  private final MD5 localHash;
   protected final List<RPCPlugin> rpcMetaPlugins;

   protected Responder(Protocol local) {
     this.local = local;
-    this.localHash = new MD5();
-    localHash.bytes(local.getMD5());
-    protocols.put(localHash, local);
+    protocols.put(new MD5(local.getMD5()), local);
     this.rpcMetaPlugins =
       new CopyOnWriteArrayList<RPCPlugin>();
@@ -211,6 +208,11 @@
       remote = Protocol.parse(request.clientProtocol.toString());
       protocols.put(request.clientHash, remote);
+    if (connection != null && response.match != HandshakeMatch.NONE)
+      connection.setRemote(remote);
+    MD5 localHash = new MD5(getLocal().getMD5());
     HandshakeResponse response = new HandshakeResponse();
     if (localHash.equals(request.serverHash)) {
       response.match =
@@ -220,7 +222,7 @@
         remote == null ? HandshakeMatch.NONE : HandshakeMatch.CLIENT;
     if (response.match != HandshakeMatch.BOTH) {
-      response.serverProtocol = local.toString();
+      response.serverProtocol = getLocal().toString();
       response.serverHash = localHash;

@@ -232,9 +234,6 @@
     handshakeWriter.write(response, out);

-    if (connection != null && response.match != HandshakeMatch.NONE)
-      connection.setRemote(remote);
     return remote;

View raw message