mina-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apali...@apache.org
Subject git commit: minor enhancements to Avro codec
Date Sat, 27 Jul 2013 21:32:53 GMT
Updated Branches:
  refs/heads/trunk f226cd1b0 -> 24f3eaf20


minor enhancements to Avro codec


Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/24f3eaf2
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/24f3eaf2
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/24f3eaf2

Branch: refs/heads/trunk
Commit: 24f3eaf20d3c63e7d6095b57fcc175a8a47161bb
Parents: f226cd1
Author: paliwalashish <paliwalashish@gmail.com>
Authored: Sun Jul 28 03:02:41 2013 +0530
Committer: paliwalashish <paliwalashish@gmail.com>
Committed: Sun Jul 28 03:02:41 2013 +0530

----------------------------------------------------------------------
 .../org/apache/mina/avro/codec/AvroDecoder.java |  7 ++-
 .../org/apache/mina/avro/codec/AvroEncoder.java |  7 ++-
 .../codec/serialization/AvroMessageDecoder.java | 27 +++++++--
 .../codec/serialization/AvroMessageEncoder.java | 59 +++++++++++++++-----
 .../mina/avro/codec/serialization/AvroTest.java | 43 +++++++++-----
 5 files changed, 105 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java
----------------------------------------------------------------------
diff --git a/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java b/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java
index 997d73a..25ce582 100644
--- a/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java
+++ b/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java
@@ -21,6 +21,7 @@
 package org.apache.mina.avro.codec;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericContainer;
 import org.apache.mina.avro.codec.serialization.AvroMessageDecoder;
 import org.apache.mina.codec.delimited.IoBufferDecoder;
 import org.apache.mina.codec.delimited.SizePrefixedDecoder;
@@ -29,12 +30,12 @@ import org.apache.mina.codec.delimited.ints.VarInt;
 /**
  *
  */
-public class AvroDecoder<GenericRecord> extends SizePrefixedDecoder<GenericRecord>
{
+public class AvroDecoder<T extends GenericContainer> extends SizePrefixedDecoder<T>
{
 
     private Schema schema;
 
-    public AvroDecoder(IoBufferDecoder<Integer> sizeDecoder, IoBufferDecoder<GenericRecord>
payloadDecoder, Schema schema) {
-        super(new VarInt().getDecoder(), new AvroMessageDecoder<GenericRecord>(schema));
+    public AvroDecoder(IoBufferDecoder<Integer> sizeDecoder, IoBufferDecoder<T>
payloadDecoder, Schema schema) {
+        super(new VarInt().getDecoder(), new AvroMessageDecoder<T>(schema));
         this.schema = schema;
     }
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java
----------------------------------------------------------------------
diff --git a/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java b/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java
index 51d3b75..22c9467 100644
--- a/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java
+++ b/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java
@@ -19,6 +19,7 @@
  */
 package org.apache.mina.avro.codec;
 
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.mina.avro.codec.serialization.AvroMessageEncoder;
 import org.apache.mina.codec.delimited.ByteBufferEncoder;
@@ -28,8 +29,8 @@ import org.apache.mina.codec.delimited.ints.VarInt;
 /**
  *
  */
-public class AvroEncoder<IN extends GenericRecord> extends SizePrefixedEncoder<GenericRecord>
{
-    public AvroEncoder(ByteBufferEncoder<Integer> sizeEncoder, ByteBufferEncoder<GenericRecord>
payloadEncoder) {
-        super(new VarInt().getEncoder(), new AvroMessageEncoder<GenericRecord>());
+public class AvroEncoder<T extends GenericContainer> extends SizePrefixedEncoder<T>
{
+    public AvroEncoder(ByteBufferEncoder<Integer> sizeEncoder, ByteBufferEncoder<T>
payloadEncoder) {
+        super(new VarInt().getEncoder(), new AvroMessageEncoder<T>());
     }
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java
----------------------------------------------------------------------
diff --git a/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java
b/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java
index 2df618e..013fea1 100644
--- a/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java
+++ b/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java
@@ -21,19 +21,29 @@
 package org.apache.mina.avro.codec.serialization;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.mina.codec.IoBuffer;
+import org.apache.mina.codec.ProtocolDecoderException;
 import org.apache.mina.codec.delimited.IoBufferDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
 /**
+ * Avro Message Decoder
  *
+ * Uses ReflectDatumReader to read the data from the stream
  */
-public class AvroMessageDecoder<GenericRecord> extends IoBufferDecoder<GenericRecord>
{
+public class AvroMessageDecoder<T extends GenericContainer> extends IoBufferDecoder<T>
{
+
+    // Logger
+    public static final Logger LOG = LoggerFactory.getLogger(AvroMessageDecoder.class);
 
     private Schema schema;
 
@@ -42,18 +52,23 @@ public class AvroMessageDecoder<GenericRecord> extends IoBufferDecoder<GenericRe
      * @param schema
      */
     public AvroMessageDecoder(Schema schema) {
+        if(schema == null) {
+            LOG.error("Avro Schema passed cannot be null");
+            throw new IllegalArgumentException("Avro Schema cannot be null");
+        }
         this.schema = schema;
     }
 
     @Override
-    public GenericRecord decode(IoBuffer input) {
+    public T decode(IoBuffer input) {
         BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(input.array(), null);
-        GenericDatumReader<GenericRecord> recordGenericDatumReader = new GenericDatumReader<GenericRecord>(schema);
-        GenericRecord result = null;
+        ReflectDatumReader<T> reader = new ReflectDatumReader<T>(schema);
+        T result = null;
         try {
-            result = recordGenericDatumReader.read(null, binaryDecoder);
+            result = reader.read(null, binaryDecoder);
         }catch (IOException ioEx) {
-            ioEx.printStackTrace();
+            LOG.error("Error while decoding", ioEx);
+            throw new ProtocolDecoderException(ioEx.getMessage());
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java
----------------------------------------------------------------------
diff --git a/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java
b/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java
index ecddad2..3ae9cc8 100644
--- a/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java
+++ b/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java
@@ -20,43 +20,76 @@
 
 package org.apache.mina.avro.codec.serialization;
 
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.mina.codec.ProtocolEncoderException;
 import org.apache.mina.codec.delimited.ByteBufferEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
+ * Avro Message encoder
  *
+ * It can be used to handle both a Generic Record as well as Specific Record
  */
-public class AvroMessageEncoder<OUT extends GenericRecord> extends ByteBufferEncoder<GenericRecord>
{
+public class AvroMessageEncoder<T extends GenericContainer> extends ByteBufferEncoder<T>
{
+
+    // Logger
+    public static final Logger LOG = LoggerFactory.getLogger(AvroMessageEncoder.class);
 
     private ByteBuffer encodedMessage;
 
     @Override
-    public int getEncodedSize(GenericRecord message) {
+    public int getEncodedSize(T message) {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
-        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(message.getSchema());
-        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
-        try {
-            writer.write(message, encoder);
-            encoder.flush();
-            byte[] encoded = out.toByteArray();
-            encodedMessage = ByteBuffer.wrap(encoded);
-            out.close();
-        } catch (IOException ioEx) {
-            // :(
+
+        // Need to check for writer
+        if(message instanceof GenericRecord) {
+            DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(message.getSchema());
+            Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+            try {
+                writer.write((GenericRecord)message, encoder);
+                encoder.flush();
+                byte[] encoded = out.toByteArray();
+                encodedMessage = ByteBuffer.wrap(encoded);
+                out.close();
+            } catch (IOException ioEx) {
+                LOG.error("error while marshalling", ioEx);
+                throw new ProtocolEncoderException(ioEx.getMessage());
+            }
+        } else if (message instanceof SpecificRecord) {
+            DatumWriter<T> writer = new SpecificDatumWriter<T>(message.getSchema());
+            Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+            try {
+                writer.write(message, encoder);
+                encoder.flush();
+                byte[] encoded = out.toByteArray();
+                encodedMessage = ByteBuffer.wrap(encoded);
+                out.close();
+            } catch (IOException ioEx) {
+                LOG.error("error while marshalling", ioEx);
+                throw new ProtocolEncoderException(ioEx.getMessage());
+            }
+        } else {
+            LOG.warn("Unknown object type, serialization method not known for {}", message.getClass());
+            throw new ProtocolEncoderException(message.getClass() + " cannot be Serialized");
         }
+
         return encodedMessage != null ? encodedMessage.capacity() : -1;
     }
 
     @Override
-    public void writeTo(GenericRecord message, ByteBuffer buffer) {
+    public void writeTo(T message, ByteBuffer buffer) {
         buffer.put(encodedMessage);
     }
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java
----------------------------------------------------------------------
diff --git a/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java b/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java
index 5a754fc..8d2c754 100644
--- a/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java
+++ b/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java
@@ -23,6 +23,7 @@ package org.apache.mina.avro.codec.serialization;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.mina.avro.generated.User;
 import org.apache.mina.codec.IoBuffer;
 import org.apache.mina.codec.delimited.ByteBufferEncoder;
 import org.apache.mina.codec.delimited.IoBufferDecoder;
@@ -33,6 +34,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  *
  */
@@ -59,26 +62,40 @@ public class AvroTest extends GenericSerializerTest {
     }
 
     @Override
-    public List<GenericRecord> getObjects() {
-        List<GenericRecord> genericRecordList = new ArrayList<GenericRecord>(1);
-        GenericRecord record1 = new GenericData.Record(SCHEMA);
-        record1.put("name", "Black Jack");
-        record1.put("favorite_number", 11);
-        record1.put("favorite_color", "Black");
-        genericRecordList.add(record1);
+    public List<User> getObjects() {
+        List<User> genericRecordList = new ArrayList<User>(1);
+        User user1 = new User("Red User", 11, "Red");
+        genericRecordList.add(user1);
 
         return genericRecordList;
     }
 
     @Test
-    public void testMessage() throws Exception {
-        ByteBufferEncoder<GenericRecord> encoder = getEncoder();
-        AvroMessageDecoder<GenericRecord> decoder = new AvroMessageDecoder<GenericRecord>(SCHEMA);
+    public void testUser() throws Exception {
+        ByteBufferEncoder<User> encoder = new AvroMessageEncoder<User>();
+        AvroMessageDecoder<User> decoder = new AvroMessageDecoder<User>(SCHEMA);
 
-        for (GenericRecord object : getObjects()) {
-            GenericRecord message = decoder.decode(IoBuffer
+        for (User object : getObjects()) {
+            User message = decoder.decode(IoBuffer
                     .wrap(encoder.encode(object)));
-            System.out.println(message);
+            assertEquals(getObjects().get(0), message);
         }
     }
+
+    @Test
+    public void testGenericMessage() throws Exception {
+        GenericRecord record1 = new GenericData.Record(SCHEMA);
+        record1.put("name", "Black Jack");
+        record1.put("favorite_number", 11);
+        record1.put("favorite_color", "Black");
+
+        ByteBufferEncoder<GenericRecord> encoder = new AvroMessageEncoder<GenericRecord>();
+        AvroMessageDecoder<User> decoder = new AvroMessageDecoder<User>(SCHEMA);
+
+        User message = decoder.decode(IoBuffer.wrap(encoder.encode(record1)));
+        assertEquals(record1.get("name"), message.getName());
+        assertEquals(record1.get("favorite_number"), message.getFavoriteNumber());
+        assertEquals(record1.get("favorite_color"), message.getFavoriteColor());
+    }
+
 }


Mime
View raw message