camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [2/3] git commit: CAMEL-6575 Enhancements for camel-avro with thanks to Vitalii
Date Mon, 29 Jul 2013 05:54:25 GMT
CAMEL-6575 Enhancements for camel-avro with thanks to Vitalii


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6783ceab
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6783ceab
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6783ceab

Branch: refs/heads/master
Commit: 6783ceab5a24c030e5bcac8f70d5994b4b6d40bb
Parents: 86077e4
Author: Willem Jiang <ningjiang@apache.org>
Authored: Mon Jul 29 11:27:20 2013 +0800
Committer: Willem Jiang <ningjiang@apache.org>
Committed: Mon Jul 29 11:27:20 2013 +0800

----------------------------------------------------------------------
 .../camel/component/avro/AvroComponent.java     |  69 ++++++-
 .../component/avro/AvroComponentException.java  |  39 ++++
 .../camel/component/avro/AvroConfiguration.java |  50 ++++-
 .../camel/component/avro/AvroConstants.java     |   1 +
 .../camel/component/avro/AvroConsumer.java      |  14 +-
 .../camel/component/avro/AvroEndpoint.java      |  20 +-
 .../camel/component/avro/AvroHttpConsumer.java  |  46 -----
 .../camel/component/avro/AvroHttpEndpoint.java  |  17 --
 .../camel/component/avro/AvroListener.java      | 186 +++++++++++++++++++
 .../camel/component/avro/AvroNettyConsumer.java |  48 -----
 .../camel/component/avro/AvroNettyEndpoint.java |  19 --
 .../camel/component/avro/AvroProducer.java      |  28 ++-
 .../component/avro/AvroReflectRequestor.java    |  34 ++++
 .../component/avro/AvroReflectResponder.java    |  37 ++++
 .../camel/component/avro/AvroRequestor.java     |  30 ---
 .../camel/component/avro/AvroResponder.java     |  75 --------
 .../component/avro/AvroSpecificRequestor.java   |  30 +++
 .../component/avro/AvroSpecificResponder.java   |  37 ++++
 .../org/apache/camel/avro/test/TestPojo.java    |  30 +++
 .../apache/camel/avro/test/TestReflection.java  |  35 ++++
 .../camel/avro/test/TestReflectionImpl.java     |  61 ++++++
 .../component/avro/AvroConsumerTestSupport.java | 130 +++++++++++--
 .../component/avro/AvroHttpConsumerTest.java    |  45 ++++-
 .../component/avro/AvroHttpProducerTest.java    |  34 +++-
 .../component/avro/AvroNettyConsumerTest.java   |  43 ++++-
 .../component/avro/AvroNettyProducerTest.java   |  50 ++++-
 .../avro/AvroNettySpringConsumerTest.java       |   2 +
 .../component/avro/AvroProducerTestSupport.java |  80 ++++++--
 .../camel/component/avro/AvroSettingsTest.java  |  54 ++++++
 .../camel/component/avro/AvroTestSupport.java   |  25 ++-
 .../component/avro/processors/GetProcessor.java |   4 +
 .../processors/ReflectionInOnlyProcessor.java   |  51 +++++
 .../processors/ReflectionInOutProcessor.java    |  41 ++++
 .../camel/component/avro/avro-http-consumer.xml |  11 ++
 .../camel/component/avro/avro-http-producer.xml |  30 +++
 .../component/avro/avro-netty-consumer.xml      |  50 +++++
 .../component/avro/avro-netty-producer.xml      |  30 +++
 37 files changed, 1285 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
index 48cd68f..db10421 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
@@ -18,9 +18,13 @@ package org.apache.camel.component.avro;
 
 import java.lang.reflect.Field;
 import java.net.URI;
+import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.avro.Protocol;
+import org.apache.avro.reflect.ReflectData;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -30,6 +34,7 @@ import org.apache.camel.util.URISupport;
 public class AvroComponent extends DefaultComponent {
 
     private AvroConfiguration configuration;
+    private ConcurrentMap<String, AvroListener> listenerRegistry = new ConcurrentHashMap<String, AvroListener>();
 
     public AvroComponent() {
     }
@@ -81,17 +86,71 @@ public class AvroComponent extends DefaultComponent {
         if (config.getProtocol() == null && config.getProtocolClassName() != null) {
             Class<?> protocolClass = getCamelContext().getClassResolver().resolveClass(config.getProtocolClassName());
             if (protocolClass != null) {
-                Field f = protocolClass.getField("PROTOCOL");
-                if (f != null) {
-                    Protocol protocol = (Protocol) f.get(null);
-                    config.setProtocol(protocol);
-                }
+            	try {
+            		Field f = protocolClass.getField("PROTOCOL");
+            		if (f != null) {
+                        Protocol protocol = (Protocol) f.get(null);
+                        config.setProtocol(protocol);
+                    }
+            	} catch(NoSuchFieldException e) {
+            		ReflectData reflectData = ReflectData.get();
+                	config.setProtocol(reflectData.getProtocol(protocolClass));
+                	config.setReflectionProtocol(true);
+            	}
             }
         }
 
         if (config.getProtocol() == null) {
             throw new IllegalArgumentException("Avro configuration does not contain protocol");
         }
+
+        if (config.getMessageName() != null && !config.getProtocol().getMessages().containsKey(config.getMessageName())) {
+            throw new IllegalArgumentException("Message " + config.getMessageName() + " is not defined in protocol");
+        }
+
+        if (config.isSingleParameter()) {
+            Map<String, Protocol.Message> messageMap = config.getProtocol().getMessages();
+            Iterable<Protocol.Message> messagesToCheck =  config.getMessageName() == null ?
+                    messageMap.values() :
+                    Collections.singleton(messageMap.get(config.getMessageName()));
+            for (Protocol.Message message: messagesToCheck) {
+                if (message.getRequest().getFields().size() != 1) {
+                    throw new IllegalArgumentException("Single parameter option can't be used with message "
+                            + message.getName() + " because it has " + message.getRequest().getFields().size() +
+                            " parameters defined"
+                    );
+                }
+            }
+        }
+    }
+    
+    /**
+     * Registers new responder with uri as key. Registers consumer in responder.
+     * In case if responder is already registered by this uri then just registers consumer.
+     * 
+     * @param uri			URI of the endpoint without message name
+     * @param messageName	message name
+     * @param consumer		consumer that will be registered in providers` registry
+     * @throws Exception
+     */
+    public void register(String uri, String messageName, AvroConsumer consumer) throws Exception {
+    	AvroListener listener = listenerRegistry.get(uri);
+    	if(listener == null) {
+    		listener = new AvroListener(consumer.getEndpoint());
+    		listenerRegistry.put(uri, listener);
+    	}
+    	listener.register(messageName, consumer);
+    }
+    
+    /**
+     * Calls unregister of consumer by appropriate message name.
+     * In case if all consumers are unregistered then it removes responder from the registry.
+     *
+     * @param uri			URI of the endpoint without message name
+     * @param messageName	message name
+     */
+    public void unregister(String uri, String messageName) {
+    	if(listenerRegistry.get(uri).unregister(messageName)) listenerRegistry.remove(uri);
     }
 
     public AvroConfiguration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java
new file mode 100644
index 0000000..677259a
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+public class AvroComponentException extends Exception {
+
+	private static final long serialVersionUID = 8915917806189741165L;
+	
+	public AvroComponentException() {
+		super();
+	}
+
+	public AvroComponentException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public AvroComponentException(String message) {
+		super(message);
+	}
+
+	public AvroComponentException(Throwable cause) {
+		super(cause);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
index 43c68bd..ee138b3 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
@@ -22,6 +22,8 @@ import java.util.Map;
 import org.apache.avro.Protocol;
 
 import org.apache.camel.RuntimeCamelException;
+import org.apache.commons.lang.StringUtils;
+import static org.apache.camel.component.avro.AvroConstants.*;
 
 public class AvroConfiguration implements Cloneable {
 
@@ -31,8 +33,12 @@ public class AvroConfiguration implements Cloneable {
     private String protocolLocation;
     private String protocolClassName;
     private String transport;
+    private String messageName;
+    private String uriAuthority;
+    private boolean reflectionProtocol;
+    private boolean singleParameter;
 
-    public AvroConfiguration copy() {
+	public AvroConfiguration copy() {
         try {
             AvroConfiguration answer = (AvroConfiguration) clone();
             return answer;
@@ -44,12 +50,20 @@ public class AvroConfiguration implements Cloneable {
     public void parseURI(URI uri, Map<String, Object> parameters, AvroComponent component) throws Exception {
         transport = uri.getScheme();
 
-        if ((!transport.equalsIgnoreCase("http")) && (!transport.equalsIgnoreCase("netty"))) {
+        if ((!AVRO_HTTP_TRANSPORT.equalsIgnoreCase(transport)) && (!AVRO_NETTY_TRANSPORT.equalsIgnoreCase(transport))) {
             throw new IllegalArgumentException("Unrecognized Avro IPC transport: " + protocol + " for uri: " + uri);
         }
 
         setHost(uri.getHost());
         setPort(uri.getPort());
+        
+        if((uri.getPath() != null) && (StringUtils.indexOf(uri.getPath(), AVRO_MESSAGE_NAME_SEPARATOR) != -1)) {
+        	String path = StringUtils.substringAfter(uri.getPath(), AVRO_MESSAGE_NAME_SEPARATOR);
+        	if(!path.contains(AVRO_MESSAGE_NAME_SEPARATOR)) setMessageName(path);
+        	else throw new IllegalArgumentException("Unrecognized Avro message name: " + path + " for uri: " + uri);
+        }
+        
+        setUriAuthority(uri.getAuthority());
     }
 
     public String getHost() {
@@ -99,4 +113,36 @@ public class AvroConfiguration implements Cloneable {
     public void setProtocolClassName(String protocolClassName) {
         this.protocolClassName = protocolClassName;
     }
+
+	public String getMessageName() {
+		return messageName;
+	}
+
+	public void setMessageName(String messageName) {
+		this.messageName = messageName;
+	}
+
+	public String getUriAuthority() {
+		return uriAuthority;
+	}
+
+	public void setUriAuthority(String uriAuthority) {
+		this.uriAuthority = uriAuthority;
+	}
+	
+	public boolean isReflectionProtocol() {
+		return reflectionProtocol;
+	}
+
+	public void setReflectionProtocol(boolean isReflectionProtocol) {
+		this.reflectionProtocol = isReflectionProtocol;
+	}
+
+	public boolean isSingleParameter() {
+		return singleParameter;
+	}
+
+	public void setSingleParameter(boolean singleParameter) {
+		this.singleParameter = singleParameter;
+	}
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
index f5d87fa..948af34 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
@@ -20,6 +20,7 @@ public final class AvroConstants {
 
     public static final transient String AVRO_NETTY_TRANSPORT = "netty";
     public static final transient String AVRO_HTTP_TRANSPORT = "http";
+    public static final transient String AVRO_MESSAGE_NAME_SEPARATOR = "/";
 
     public static final transient String AVRO_MESSAGE_NAME = "CamelAvroMessageName";
 

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
index a4f9514..5375c2d 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
@@ -20,7 +20,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 
-public abstract class AvroConsumer extends DefaultConsumer {
+public class AvroConsumer extends DefaultConsumer {
 
     public AvroConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -30,4 +30,16 @@ public abstract class AvroConsumer extends DefaultConsumer {
     public AvroEndpoint getEndpoint() {
         return (AvroEndpoint) super.getEndpoint();
     }
+    
+    @Override
+    protected void doStart() throws Exception {
+    	super.doStart();
+    	((AvroComponent) getEndpoint().getComponent()).register(getEndpoint().getConfiguration().getUriAuthority(), getEndpoint().getConfiguration().getMessageName(), this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        ((AvroComponent) getEndpoint().getComponent()).unregister(getEndpoint().getConfiguration().getUriAuthority(), getEndpoint().getConfiguration().getMessageName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
index 9b28666..0da6796 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
@@ -20,8 +20,10 @@ import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
 
 import org.apache.camel.Component;
+import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultEndpoint;
 
 public abstract class AvroEndpoint extends DefaultEndpoint {
@@ -56,12 +58,24 @@ public abstract class AvroEndpoint extends DefaultEndpoint {
     public boolean isSingleton() {
         return true;
     }
+    
+    /**
+     * Creates a new <a
+     * href="http://camel.apache.org/event-driven-consumer.html">Event
+     * Driven Consumer</a> which consumes messages from the endpoint using the
+     * given processor
+     *
+     * @param processor the given processor
+     * @return a newly created consumer
+     * @throws Exception can be thrown
+     */
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new AvroConsumer(this, processor);
+    }
 
     public AvroConfiguration getConfiguration() {
         return configuration;
     }
 
-    public Protocol getProtocol() {
-        return configuration.getProtocol();
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
deleted file mode 100644
index 7f2a230..0000000
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.avro;
-
-import org.apache.avro.ipc.HttpServer;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
-
-public class AvroHttpConsumer extends AvroConsumer {
-
-    HttpServer server;
-
-    public AvroHttpConsumer(Endpoint endpoint, Processor processor) {
-        super(endpoint, processor);
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        AvroConfiguration configuration = getEndpoint().getConfiguration();
-        server = new HttpServer(new AvroResponder(this), configuration.getPort());
-        server.start();
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-        if (server != null) {
-            server.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
index 6db3c7f..ee81796 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
@@ -45,21 +45,4 @@ public class AvroHttpEndpoint extends AvroEndpoint {
     public Producer createProducer() throws Exception {
         return new AvroHttpProducer(this);
     }
-
-    /**
-     * Creates a new <a
-     * href="http://camel.apache.org/event-driven-consumer.html">Event
-     * Driven Consumer</a> which consumes messages from the endpoint using the
-     * given processor
-     *
-     * @param processor the given processor
-     * @return a newly created consumer
-     * @throws Exception can be thrown
-     */
-    @Override
-    public Consumer createConsumer(Processor processor) throws Exception {
-        AvroHttpConsumer answer = new AvroHttpConsumer(this, processor);
-        configureConsumer(answer);
-        return answer;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java
new file mode 100644
index 0000000..546e490
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java
@@ -0,0 +1,186 @@
+package org.apache.camel.component.avro;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.specific.SpecificData;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.commons.lang.StringUtils;
+import org.mortbay.log.Log;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.camel.component.avro.AvroConstants.AVRO_HTTP_TRANSPORT;
+import static org.apache.camel.component.avro.AvroConstants.AVRO_NETTY_TRANSPORT;
+
+/**
+ * This class holds server that listen to given protocol:host:port combination and dispatches messages to
+ * different routes mapped.
+ */
+public class AvroListener {
+
+    private ConcurrentMap<String, AvroConsumer> consumerRegistry = new ConcurrentHashMap<String, AvroConsumer>();
+    private AvroConsumer defaultConsumer;
+    private final Server server;
+    private final AvroConfiguration configuration;
+
+    public AvroListener(AvroEndpoint endpoint)  throws Exception {
+        configuration = endpoint.getConfiguration();
+        server = initAndStartServer(endpoint.getConfiguration());
+    }
+
+    /**
+     * Initializes and starts http or netty server on basis of transport protocol from configuration.
+     *
+     *
+     * @param configuration
+     * @return			Initialized and started server
+     * @throws java.io.IOException
+     */
+    private Server initAndStartServer(AvroConfiguration configuration) throws Exception {
+        SpecificResponder responder;
+        Server server;
+
+        if(configuration.isReflectionProtocol()) {
+            responder = new AvroReflectResponder(configuration.getProtocol(), this);
+        }
+        else {
+            responder = new AvroSpecificResponder(configuration.getProtocol(), this);
+        }
+
+
+        if(AVRO_HTTP_TRANSPORT.equalsIgnoreCase(configuration.getTransport()))
+            server = new HttpServer(responder, configuration.getPort());
+        else  if(AVRO_NETTY_TRANSPORT.equalsIgnoreCase(configuration.getTransport()))
+            server = new NettyServer(responder, new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+        else throw new IllegalArgumentException("Unknown transport " + configuration.getTransport());
+
+        server.start();
+
+        return server;
+    }
+
+    /**
+     * Registers consumer by appropriate message name as key in registry.
+     *
+     * @param messageName	message name
+     * @param consumer		avro consumer
+     * @throws AvroComponentException
+     */
+    public void register(String messageName, AvroConsumer consumer) throws AvroComponentException {
+        if (messageName == null) {
+            if(this.defaultConsumer != null)
+                throw new AvroComponentException("Default consumer already registered for uri: " + consumer.getEndpoint().getEndpointUri());
+            this.defaultConsumer = consumer;
+        } else {
+            if (consumerRegistry.putIfAbsent(messageName, consumer) != null) {
+                throw new AvroComponentException("Consumer already registered for message: " + messageName + " and uri: " + consumer.getEndpoint().getEndpointUri());
+            }
+        }
+    }
+
+    /**
+     * Unregisters consumer by message name.
+     * Stops server in case if all consumers are unregistered and default consumer is absent or stopped.
+     *
+     * @param messageName message name
+     * @return true if all consumers are unregistered and defaultConsumer is absent or null.
+     *         It means that this responder can be unregistered.
+     */
+    public boolean unregister(String messageName) {
+        if(!StringUtils.isEmpty(messageName)) {
+            if(consumerRegistry.remove(messageName) == null)
+                Log.warn("Consumer with message name " + messageName + " was already unregistered.");
+        }
+        else defaultConsumer = null;
+
+        if((defaultConsumer == null) && (consumerRegistry.isEmpty())) {
+            if (server != null) {
+                server.close();
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public Object respond(Protocol.Message message, Object request, SpecificData data) throws Exception {
+        AvroConsumer consumer = this.defaultConsumer;
+        if(this.consumerRegistry.containsKey(message.getName()))
+            consumer = this.consumerRegistry.get(message.getName());
+
+        if(consumer == null) throw new AvroComponentException("No consumer defined for message: " + message.getName());
+
+        Object params = extractParams(message, request, consumer.getEndpoint().getConfiguration().isSingleParameter(), data);
+
+        return processExchange(consumer, message, params);
+    }
+
+    /**
+     * Extracts parameters from RPC call to List or converts to object of appropriate type
+     * if only one parameter set.
+     *
+     * @param	message Avro message
+     * @param	request Avro request
+     * @param	singleParameter Indicates that called method has single parameter
+     * @param	dataResolver Extracts type of parameters in call
+     * @return	Parameters of RPC method invocation
+     */
+    private static Object extractParams(Protocol.Message message, Object request, boolean singleParameter, SpecificData dataResolver) {
+
+        if(singleParameter) {
+            Schema.Field field = message.getRequest().getFields().get(0);
+            return dataResolver.getField(request, field.name(), field.pos());
+        } else {
+            int i = 0;
+            Object[] params =  new Object[message.getRequest().getFields().size()];
+            for (Schema.Field param : message.getRequest().getFields()) {
+                params[i] = dataResolver.getField(request, param.name(), param.pos());
+                i++;
+            }
+            return params;
+        }
+    }
+
+    /**
+     * Creates exchange and processes it.
+     *
+     * @param consumer	Holds processor and exception handler
+     * @param message	Message on which exchange is created
+     * @param params	Params of exchange
+     * @return			Response of exchange processing
+     * @throws Exception
+     */
+    private static Object processExchange(AvroConsumer consumer, Protocol.Message message, Object params) throws Exception {
+        Object response;
+        Exchange exchange = consumer.getEndpoint().createExchange(message, params);
+
+        try {
+            consumer.getProcessor().process(exchange);
+        } catch (Throwable e) {
+            consumer.getExceptionHandler().handleException(e);
+        }
+
+        if (ExchangeHelper.isOutCapable(exchange)) {
+            response = exchange.getOut().getBody();
+        } else {
+            response = null;
+        }
+
+        boolean failed = exchange.isFailed();
+        if (failed) {
+            if (exchange.getException() != null) {
+                throw exchange.getException();
+            } else {
+                // failed and no exception, must be a fault
+                throw new AvroComponentException("Camel processing error.");
+            }
+        }
+        return response;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
deleted file mode 100644
index 72cb30d..0000000
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.avro;
-
-import java.net.InetSocketAddress;
-
-import org.apache.avro.ipc.NettyServer;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
-
-public class AvroNettyConsumer extends AvroConsumer {
-
-    NettyServer server;
-
-    public AvroNettyConsumer(Endpoint endpoint, Processor processor) {
-        super(endpoint, processor);
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        AvroConfiguration configuration = getEndpoint().getConfiguration();
-        server = new NettyServer(new AvroResponder(this), new InetSocketAddress(configuration.getHost(), configuration.getPort()));
-        server.start();
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-        if (server != null) {
-            server.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
index cb27955..261ef11 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
@@ -17,8 +17,6 @@
 package org.apache.camel.component.avro;
 
 import org.apache.camel.Component;
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 
 public class AvroNettyEndpoint extends AvroEndpoint {
@@ -45,21 +43,4 @@ public class AvroNettyEndpoint extends AvroEndpoint {
     public Producer createProducer() throws Exception {
         return new AvroNettyProducer(this);
     }
-
-    /**
-     * Creates a new <a
-     * href="http://camel.apache.org/event-driven-consumer.html">Event
-     * Driven Consumer</a> which consumes messages from the endpoint using the
-     * given processor
-     *
-     * @param processor the given processor
-     * @return a newly created consumer
-     * @throws Exception can be thrown
-     */
-    @Override
-    public Consumer createConsumer(Processor processor) throws Exception {
-        AvroNettyConsumer answer = new AvroNettyConsumer(this, processor);
-        configureConsumer(answer);
-        return answer;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
index b7718bc..b3ba04c 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
@@ -19,12 +19,12 @@ package org.apache.camel.component.avro;
 import org.apache.avro.ipc.Callback;
 import org.apache.avro.ipc.Requestor;
 import org.apache.avro.ipc.Transceiver;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.commons.lang.StringUtils;
 
 public abstract class AvroProducer extends DefaultAsyncProducer implements ServicePoolAware {
 
@@ -41,8 +41,29 @@ public abstract class AvroProducer extends DefaultAsyncProducer implements Servi
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         Object request = exchange.getIn().getBody();
 
+        AvroConfiguration configuration = getEndpoint().getConfiguration();
+        if (transceiver == null) {
+            try {
+                transceiver = createTransceiver();
+                if(configuration.isReflectionProtocol())
+                    requestor = new AvroReflectRequestor(configuration.getProtocol(), transceiver);
+                else
+                    requestor = new AvroSpecificRequestor(configuration.getProtocol(), transceiver);
+            } catch (Exception e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+        }
+
         try {
-            requestor.request(exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class), wrapObjectToArray(request), new Callback<Object>() {
+        	String messageName;
+        	if(!StringUtils.isEmpty(exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class)))
+        		messageName = exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class);
+        	else
+        		messageName = configuration.getMessageName();
+        	
+            requestor.request(messageName, wrapObjectToArray(request), new Callback<Object>() {
                 @Override
                 public void handleResult(Object result) {
                     // got result from avro, so set it on the exchange and invoke the callback
@@ -88,8 +109,6 @@ public abstract class AvroProducer extends DefaultAsyncProducer implements Servi
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        transceiver = createTransceiver();
-        requestor = new AvroRequestor(getEndpoint().getProtocol(), transceiver);
     }
 
     @Override
@@ -97,6 +116,7 @@ public abstract class AvroProducer extends DefaultAsyncProducer implements Servi
         super.doStop();
         if (transceiver != null) {
             transceiver.close();
+            transceiver = null;
         }
         requestor = null;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java
new file mode 100644
index 0000000..1d6fc2a
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.reflect.ReflectRequestor;
+
+public class AvroReflectRequestor extends ReflectRequestor {
+
+	public AvroReflectRequestor(Class<?> iface, Transceiver transceiver) throws IOException {
+		super(iface, transceiver);
+	}
+
+	public AvroReflectRequestor(Protocol protocol, Transceiver transceiver) throws IOException {
+		super(protocol, transceiver);
+	}
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java
new file mode 100644
index 0000000..c841021
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.reflect.ReflectResponder;
+import org.apache.avro.reflect.ReflectData;
+
+public class AvroReflectResponder extends ReflectResponder {
+    private AvroListener listener;
+
+	
+	public AvroReflectResponder(Protocol protocol, AvroListener listener)  throws Exception {
+        super(protocol, null);
+        this.listener = listener;
+    }
+
+	@Override
+    public Object respond(Protocol.Message message, Object request) throws Exception {
+    	return listener.respond(message, request, ReflectData.get());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
deleted file mode 100644
index dd54c20..0000000
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Protocol;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
-
-public class AvroRequestor extends SpecificRequestor {
-
-    public AvroRequestor(Protocol protocol, Transceiver transceiver) throws IOException {
-        super(protocol, transceiver);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
deleted file mode 100644
index 4d0cd40..0000000
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.avro;
-
-import org.apache.avro.Protocol;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.ipc.specific.SpecificResponder;
-import org.apache.avro.specific.SpecificData;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.util.ExchangeHelper;
-
-public class AvroResponder extends SpecificResponder {
-
-    private AvroConsumer consumer;
-
-    public AvroResponder(AvroConsumer consumer) {
-        super(consumer.getEndpoint().getProtocol(), null);
-        this.consumer = consumer;
-    }
-
-    @Override
-    public Object respond(Protocol.Message message, Object request) throws Exception {
-        Object response;
-        int numParams = message.getRequest().getFields().size();
-        Object[] params = new Object[numParams];
-        Class<?>[] paramTypes = new Class[numParams];
-        int i = 0;
-        for (Schema.Field param : message.getRequest().getFields()) {
-            params[i] = ((GenericRecord) request).get(param.name());
-            paramTypes[i] = SpecificData.get().getClass(param.schema());
-            i++;
-        }
-        Exchange exchange = consumer.getEndpoint().createExchange(message, params);
-
-        try {
-            consumer.getProcessor().process(exchange);
-        } catch (Throwable e) {
-            consumer.getExceptionHandler().handleException(e);
-        }
-
-        if (ExchangeHelper.isOutCapable(exchange)) {
-            response = exchange.getOut().getBody();
-        } else {
-            response = null;
-        }
-
-        boolean failed = exchange.isFailed();
-        if (failed) {
-            if (exchange.getException() != null) {
-                response = exchange.getException();
-            } else {
-                // failed and no exception, must be a fault
-                response = exchange.getOut().getBody();
-            }
-        }
-        return response;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java
new file mode 100644
index 0000000..1d6b5f6
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+
+public class AvroSpecificRequestor extends SpecificRequestor {
+
+    public AvroSpecificRequestor(Protocol protocol, Transceiver transceiver) throws IOException {
+        super(protocol, transceiver);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java
new file mode 100644
index 0000000..98a5d82
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.specific.SpecificData;
+
+public class AvroSpecificResponder extends SpecificResponder {
+    private AvroListener listener;
+
+
+    public AvroSpecificResponder(Protocol protocol, AvroListener listener)  throws Exception {
+        super(protocol, null);
+        this.listener = listener;
+    }
+
+    @Override
+    public Object respond(Protocol.Message message, Object request) throws Exception {
+        return listener.respond(message, request, SpecificData.get());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java
new file mode 100644
index 0000000..d7290d8
--- /dev/null
+++ b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.avro.test;
+
+public class TestPojo {
+
+	private String pojoName;
+
+	public String getPojoName() {
+		return pojoName;
+	}
+
+	public void setPojoName(String pojoName) {
+		this.pojoName = pojoName;
+	} 
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java
new file mode 100644
index 0000000..07eb08b
--- /dev/null
+++ b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.avro.test;
+
+public interface TestReflection {
+	
+	public String getName();
+	
+	public void setName(String name);
+
+	public int getAge();
+	
+	public void setAge(int age);
+	
+	public int increaseAge(int age);
+	
+	public void setTestPojo(TestPojo testPojo);
+	
+	public TestPojo getTestPojo();
+	
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java
new file mode 100644
index 0000000..98b9eea
--- /dev/null
+++ b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.avro.test;
+
+public class TestReflectionImpl implements TestReflection {
+
+	String name = "";
+	int age = 0;
+	TestPojo testPojo;
+	
+	@Override
+	public String getName() {
+		return this.name;
+	}
+
+	@Override
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	@Override
+	public int getAge() {
+		return this.age;
+	}
+
+	@Override
+	public void setAge(int age) {
+		this.age = age;
+	}
+	
+	@Override
+	public int increaseAge(int age) {
+		this.age = ++age;
+		return this.age;
+	}
+
+	@Override
+	public void setTestPojo(TestPojo testPojo) {
+		this.testPojo = testPojo;
+	}
+
+	@Override
+	public TestPojo getTestPojo() {
+		return testPojo;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java
index 094da2e..4ee9cd9 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java
@@ -19,23 +19,42 @@ package org.apache.camel.component.avro;
 
 import java.io.IOException;
 
-import org.apache.avro.Protocol;
+
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.ipc.Requestor;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.camel.CamelContext;
 import org.apache.camel.avro.generated.Key;
-import org.apache.camel.avro.generated.KeyValueProtocol;
 import org.apache.camel.avro.generated.Value;
 import org.apache.camel.avro.impl.KeyValueProtocolImpl;
-
+import org.apache.camel.avro.test.TestPojo;
+import org.apache.camel.avro.test.TestReflection;
+import org.apache.camel.avro.test.TestReflectionImpl;
 import org.junit.After;
 import org.junit.Test;
 
 public abstract class AvroConsumerTestSupport extends AvroTestSupport {
 
+    protected int avroPortMessageInRoute = setupFreePort("avroPortMessageInRoute");
+    protected int avroPortForWrongMessages = setupFreePort("avroPortForWrongMessages");
+
     Transceiver transceiver;
     Requestor requestor;
+
+    Transceiver transceiverMessageInRoute;
+    Requestor requestorMessageInRoute;
+
+    Transceiver transceiverForWrongMessages;
+    Requestor requestorForWrongMessages;
+
+    Transceiver reflectTransceiver;
+    Requestor reflectRequestor;
+
     KeyValueProtocolImpl keyValue = new KeyValueProtocolImpl();
+    TestReflection testReflection = new TestReflectionImpl();
+
+    public static final String REFLECTION_TEST_NAME = "Chucky";
+    public static final int REFLECTION_TEST_AGE = 100;
 
     protected abstract void initializeTranceiver() throws IOException;
 
@@ -47,6 +66,18 @@ public abstract class AvroConsumerTestSupport extends AvroTestSupport {
         if (transceiver != null) {
             transceiver.close();
         }
+
+        if (transceiverMessageInRoute != null) {
+            transceiverMessageInRoute.close();
+        }
+
+        if (transceiverForWrongMessages != null) {
+            transceiverForWrongMessages.close();
+        }
+
+        if (reflectTransceiver != null) {
+            reflectTransceiver.close();
+        }
     }
 
     @Test
@@ -59,6 +90,59 @@ public abstract class AvroConsumerTestSupport extends AvroTestSupport {
     }
 
     @Test
+    public void testInOnlyMessageInRoute() throws Exception {
+        initializeTranceiver();
+        Key key = Key.newBuilder().setKey("1").build();
+        Value value = Value.newBuilder().setValue("test value").build();
+        Object[] request = {key, value};
+        requestorMessageInRoute.request("put", request);
+    }
+
+    @Test
+    public void testInOnlyReflectRequestor() throws Exception {
+        initializeTranceiver();
+        Object[] request = {REFLECTION_TEST_NAME};
+        reflectRequestor.request("setName", request);
+        assertEquals(REFLECTION_TEST_NAME, testReflection.getName());
+    }
+
+    @Test(expected=AvroRuntimeException.class)
+    public void testInOnlyWrongMessageName() throws Exception {
+        initializeTranceiver();
+        Key key = Key.newBuilder().setKey("1").build();
+        Value value = Value.newBuilder().setValue("test value").build();
+        Object[] request = {key, value};
+        requestorMessageInRoute.request("throwException", request);
+    }
+
+    @Test(expected=AvroRuntimeException.class)
+    public void testInOnlyToNotExistingRoute() throws Exception {
+        initializeTranceiver();
+        Key key = Key.newBuilder().setKey("1").build();
+        Value value = Value.newBuilder().setValue("test value").build();
+        Object[] request = {key, value};
+        requestorForWrongMessages.request("get", request);
+    }
+
+    @Test
+    public void testInOnlyReflectSingleParameterNotSet() throws Exception {
+        initializeTranceiver();
+        Object[] request = {100};
+        reflectRequestor.request("setAge", request);
+        assertEquals(0, testReflection.getAge());
+    }
+
+    @Test
+    public void testInOnlyReflectionPojoTest() throws Exception {
+        initializeTranceiver();
+        TestPojo testPojo = new TestPojo();
+        testPojo.setPojoName("pojo1");
+        Object[] request = {testPojo};
+        reflectRequestor.request("setTestPojo", request);
+        assertEquals(testPojo.getPojoName(), testReflection.getTestPojo().getPojoName());
+    }
+
+    @Test
     public void testInOut() throws Exception {
         initializeTranceiver();
         keyValue.getStore().clear();
@@ -70,15 +154,35 @@ public abstract class AvroConsumerTestSupport extends AvroTestSupport {
         assertEquals(value, response);
     }
 
-    @Override
-    protected CamelContext createCamelContext() throws Exception {
-        CamelContext context = super.createCamelContext();
-        Protocol protocol = KeyValueProtocol.PROTOCOL;
-        AvroConfiguration configuration = new AvroConfiguration();
-        configuration.setProtocol(protocol);
-        AvroComponent component = new AvroComponent(context);
-        component.setConfiguration(configuration);
-        context.addComponent("avro", component);
-        return context;
+    @Test
+    public void testInOutMessageInRoute() throws Exception {
+        initializeTranceiver();
+        keyValue.getStore().clear();
+        Key key = Key.newBuilder().setKey("2").build();
+        Value value = Value.newBuilder().setValue("test value").build();
+        keyValue.getStore().put(key, value);
+        Object[] request = {key};
+        Object response = requestorMessageInRoute.request("get", request);
+        assertEquals(value, response);
+    }
+
+    @Test
+    public void testInOutReflectRequestor() throws Exception {
+        initializeTranceiver();
+        Object[] request = {REFLECTION_TEST_AGE};
+        Object response = reflectRequestor.request("increaseAge", request);
+        assertEquals(testReflection.getAge(), response);
+    }
+
+    @Test
+    public void testInOutReflectionPojoTest() throws Exception {
+        initializeTranceiver();
+        TestPojo testPojo = new TestPojo();
+        testPojo.setPojoName("pojo2");
+        Object[] request = {testPojo};
+        reflectRequestor.request("setTestPojo", request);
+        request = new Object[0];
+        Object response = reflectRequestor.request("getTestPojo", request);
+        assertEquals(testPojo.getPojoName(), ((TestPojo) response).getPojoName());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java
index 148055d..8bec47d 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java
@@ -21,11 +21,15 @@ import java.io.IOException;
 import java.net.URL;
 
 import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.reflect.ReflectRequestor;
 import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.camel.avro.generated.KeyValueProtocol;
+import org.apache.camel.avro.test.TestReflection;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.avro.processors.GetProcessor;
 import org.apache.camel.component.avro.processors.PutProcessor;
+import org.apache.camel.component.avro.processors.ReflectionInOnlyProcessor;
+import org.apache.camel.component.avro.processors.ReflectionInOutProcessor;
 
 public class AvroHttpConsumerTest extends AvroConsumerTestSupport {
 
@@ -33,16 +37,51 @@ public class AvroHttpConsumerTest extends AvroConsumerTestSupport {
     protected void initializeTranceiver() throws IOException {
         transceiver = new HttpTransceiver(new URL("http://localhost:" + avroPort));
         requestor = new SpecificRequestor(KeyValueProtocol.class, transceiver);
+
+        transceiverMessageInRoute = new HttpTransceiver(new URL("http://localhost:" + avroPortMessageInRoute));
+        requestorMessageInRoute = new SpecificRequestor(KeyValueProtocol.class, transceiverMessageInRoute);
+
+        transceiverForWrongMessages = new HttpTransceiver(new URL("http://localhost:" + avroPortForWrongMessages));
+        requestorForWrongMessages = new SpecificRequestor(KeyValueProtocol.class, transceiverForWrongMessages);
+
+        reflectTransceiver = new HttpTransceiver(new URL("http://localhost:" + avroPortReflection));
+        reflectRequestor = new ReflectRequestor(TestReflection.class, reflectTransceiver);
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:exception-handler"));
+
                 //In Only
-                from("avro:http:localhost:" + avroPort).choice()
-                    .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue))
-                    .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue));
+                from("avro:http:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol").choice()
+                        .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue))
+                        .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue));
+
+                from("avro:http:localhost:" + avroPortMessageInRoute + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                        .process(new PutProcessor(keyValue));
+
+                from("avro:http:localhost:" + avroPortMessageInRoute + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                        .process(new GetProcessor(keyValue));
+
+                from("avro:http:localhost:" + avroPortForWrongMessages + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                        .process(new PutProcessor(keyValue));
+
+                from("avro:http:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+                        .process(new ReflectionInOnlyProcessor(testReflection));
+
+                from("avro:http:localhost:" + avroPortReflection + "/setAge?protocolClassName=org.apache.camel.avro.test.TestReflection")
+                        .process(new ReflectionInOnlyProcessor(testReflection));
+
+                from("avro:http:localhost:" + avroPortReflection + "/setTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+                        .process(new ReflectionInOnlyProcessor(testReflection));
+
+                from("avro:http:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+                        .process(new ReflectionInOutProcessor(testReflection));
+
+                from("avro:http:localhost:" + avroPortReflection + "/getTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection")
+                        .process(new ReflectionInOutProcessor(testReflection));
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java
index 237839a..177eae2 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java
@@ -20,8 +20,10 @@ package org.apache.camel.component.avro;
 import java.io.IOException;
 
 import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.reflect.ReflectResponder;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.camel.avro.generated.KeyValueProtocol;
+import org.apache.camel.avro.test.TestReflection;
 import org.apache.camel.builder.RouteBuilder;
 
 public class AvroHttpProducerTest extends AvroProducerTestSupport {
@@ -32,6 +34,11 @@ public class AvroHttpProducerTest extends AvroProducerTestSupport {
             server = new HttpServer(new SpecificResponder(KeyValueProtocol.PROTOCOL, keyValue), avroPort);
             server.start();
         }
+        
+        if (serverReflection == null) {
+        	serverReflection = new HttpServer(new ReflectResponder(TestReflection.class, testReflection), avroPortReflection);
+        	serverReflection.start();
+        }
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -39,10 +46,33 @@ public class AvroHttpProducerTest extends AvroProducerTestSupport {
             @Override
             public void configure() throws Exception {
                 //In Only
-                from("direct:in").to("avro:http:localhost:" + avroPort);
+                from("direct:in")
+                .to("avro:http:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol");
+                
+                //In Only with message in route
+                from("direct:in-message-name")
+                .errorHandler(deadLetterChannel("mock:in-message-name-error"))
+                .to("avro:http:localhost:" + avroPort + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                .to("mock:result-in-message-name");
+                
+                //In Only with existing interface
+                from("direct:in-reflection")
+                .to("avro:http:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true");
 
                 //InOut
-                from("direct:inout").to("avro:http:localhost:" + avroPort).to("mock:result-inout");
+                from("direct:inout")
+                .to("avro:http:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                .to("mock:result-inout");
+                
+                //InOut with message in route
+                from("direct:inout-message-name")
+                .to("avro:http:localhost:" + avroPort + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                .to("mock:result-inout-message-name");
+                
+                //InOut with existing interface
+                from("direct:inout-reflection")
+                .to("avro:http:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+                .to("mock:result-inout-reflection");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java
index b427217..2156241 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java
@@ -21,11 +21,15 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.reflect.ReflectRequestor;
 import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.camel.avro.generated.KeyValueProtocol;
+import org.apache.camel.avro.test.TestReflection;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.avro.processors.GetProcessor;
 import org.apache.camel.component.avro.processors.PutProcessor;
+import org.apache.camel.component.avro.processors.ReflectionInOnlyProcessor;
+import org.apache.camel.component.avro.processors.ReflectionInOutProcessor;
 
 public class AvroNettyConsumerTest extends AvroConsumerTestSupport {
 
@@ -33,6 +37,15 @@ public class AvroNettyConsumerTest extends AvroConsumerTestSupport {
     protected void initializeTranceiver() throws IOException {
         transceiver = new NettyTransceiver(new InetSocketAddress("localhost", avroPort));
         requestor = new SpecificRequestor(KeyValueProtocol.class, transceiver);
+
+        transceiverMessageInRoute = new NettyTransceiver(new InetSocketAddress("localhost", avroPortMessageInRoute));
+        requestorMessageInRoute = new SpecificRequestor(KeyValueProtocol.class, transceiverMessageInRoute);
+
+        transceiverForWrongMessages = new NettyTransceiver(new InetSocketAddress("localhost", avroPortForWrongMessages));
+        requestorForWrongMessages = new SpecificRequestor(KeyValueProtocol.class, transceiverForWrongMessages);
+
+        reflectTransceiver = new NettyTransceiver(new InetSocketAddress("localhost", avroPortReflection));
+        reflectRequestor = new ReflectRequestor(TestReflection.class, reflectTransceiver);
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -40,9 +53,33 @@ public class AvroNettyConsumerTest extends AvroConsumerTestSupport {
             @Override
             public void configure() throws Exception {
                 //In Only
-                from("avro:netty:localhost:" + avroPort).choice()
-                    .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue))
-                    .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue));
+                from("avro:netty:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol").choice()
+                        .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue))
+                        .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue));
+
+                from("avro:netty:localhost:" + avroPortMessageInRoute + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                        .process(new PutProcessor(keyValue));
+
+                from("avro:netty:localhost:" + avroPortMessageInRoute + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                        .process(new GetProcessor(keyValue));
+
+                from("avro:netty:localhost:" + avroPortForWrongMessages + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                        .process(new PutProcessor(keyValue));
+
+                from("avro:netty:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+                        .process(new ReflectionInOnlyProcessor(testReflection));
+
+                from("avro:netty:localhost:" + avroPortReflection + "/setAge?protocolClassName=org.apache.camel.avro.test.TestReflection")
+                        .process(new ReflectionInOnlyProcessor(testReflection));
+
+                from("avro:http:localhost:" + avroPortReflection + "/setTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+                        .process(new ReflectionInOnlyProcessor(testReflection));
+
+                from("avro:http:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+                        .process(new ReflectionInOutProcessor(testReflection));
+
+                from("avro:netty:localhost:" + avroPortReflection + "/getTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection")
+                        .process(new ReflectionInOutProcessor(testReflection));
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java
index 4b890af..5816e39 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java
@@ -20,30 +20,60 @@ package org.apache.camel.component.avro;
 import java.net.InetSocketAddress;
 
 import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.reflect.ReflectResponder;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.camel.avro.generated.KeyValueProtocol;
+import org.apache.camel.avro.test.TestReflection;
 import org.apache.camel.builder.RouteBuilder;
 
 public class AvroNettyProducerTest extends AvroProducerTestSupport {
 
+    @Override
+    protected void initializeServer() {
+        if (server == null) {
+            server = new NettyServer(new SpecificResponder(KeyValueProtocol.PROTOCOL, keyValue), new InetSocketAddress("localhost", avroPort));
+            server.start();
+        }
+
+        if (serverReflection == null) {
+            serverReflection = new NettyServer(new ReflectResponder(TestReflection.class, testReflection), new InetSocketAddress("localhost", avroPortReflection));
+            serverReflection.start();
+        }
+    }
+
     public RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 //In Only
-                from("direct:in").to("avro:netty:localhost:" + avroPort);
+                from("direct:in")
+                        .to("avro:netty:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol");
+
+                //In Only with message in route
+                from("direct:in-message-name")
+                        .errorHandler(deadLetterChannel("mock:in-message-name-error"))
+                        .to("avro:netty:localhost:" + avroPort + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                        .to("mock:result-in-message-name");
+
+                //In Only with existing interface
+                from("direct:in-reflection")
+                        .to("avro:netty:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection");
 
                 //InOut
-                from("direct:inout").to("avro:netty:localhost:" + avroPort).to("mock:result-inout");
+                from("direct:inout")
+                        .to("avro:netty:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                        .to("mock:result-inout");
+
+                //InOut
+                from("direct:inout-message-name")
+                        .to("avro:netty:localhost:" + avroPort + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+                        .to("mock:result-inout-message-name");
+
+                //InOut with existing interface
+                from("direct:inout-reflection")
+                        .to("avro:netty:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection")
+                        .to("mock:result-inout-reflection");
             }
         };
     }
-
-    @Override
-    protected void initializeServer() {
-        if (server == null) {
-            server = new NettyServer(new SpecificResponder(KeyValueProtocol.PROTOCOL, keyValue), new InetSocketAddress("localhost", avroPort));
-            server.start();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java
index 3121934..a0fe869 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.avro;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.avro.impl.KeyValueProtocolImpl;
+import org.apache.camel.avro.test.TestReflectionImpl;
 import org.apache.camel.spring.SpringCamelContext;
 
 import org.junit.After;
@@ -37,6 +38,7 @@ public class AvroNettySpringConsumerTest extends AvroNettyConsumerTest {
         super.setUp();
 
         keyValue = (KeyValueProtocolImpl) applicationContext.getBean("keyValue");
+        testReflection = (TestReflectionImpl) applicationContext.getBean("testReflection");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java
index c029a56..39c215d 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java
@@ -19,13 +19,12 @@ package org.apache.camel.component.avro;
 
 import java.io.IOException;
 
-import org.apache.avro.Protocol;
 import org.apache.avro.ipc.Server;
 import org.apache.camel.CamelContext;
 import org.apache.camel.avro.generated.Key;
-import org.apache.camel.avro.generated.KeyValueProtocol;
 import org.apache.camel.avro.generated.Value;
 import org.apache.camel.avro.impl.KeyValueProtocolImpl;
+import org.apache.camel.avro.test.TestReflectionImpl;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.After;
 import org.junit.Test;
@@ -33,7 +32,9 @@ import org.junit.Test;
 public abstract class AvroProducerTestSupport extends AvroTestSupport {
 
     Server server;
+    Server serverReflection;
     KeyValueProtocolImpl keyValue = new KeyValueProtocolImpl();
+    TestReflectionImpl testReflection = new TestReflectionImpl();
 
     protected abstract void initializeServer() throws IOException;
 
@@ -52,6 +53,10 @@ public abstract class AvroProducerTestSupport extends AvroTestSupport {
         if (server != null) {
             server.close();
         }
+
+        if (serverReflection != null) {
+            serverReflection.close();
+        }
     }
 
     @Test
@@ -64,6 +69,40 @@ public abstract class AvroProducerTestSupport extends AvroTestSupport {
     }
 
     @Test
+    public void testInOnlyWithMessageNameInRoute() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:result-in-message-name");
+        mock.expectedMessageCount(1);
+        Key key = Key.newBuilder().setKey("1").build();
+        Value value = Value.newBuilder().setValue("test value").build();
+        Object[] request = {key, value};
+        template.sendBody("direct:in-message-name", request);
+        assertEquals(value, keyValue.getStore().get(key));
+        mock.assertIsSatisfied(5000);
+    }
+
+    @Test
+    public void testInOnlyReflection() throws InterruptedException {
+        String name = "Chuck";
+        Object[] request = {name};
+        template.sendBody("direct:in-reflection", request);
+        assertEquals(name, testReflection.getName());
+    }
+
+    @Test
+    public void testInOnlyWithWrongMessageNameInMessage() throws InterruptedException {
+        MockEndpoint mockInMessageEnd = getMockEndpoint("mock:result-in-message-name");
+        mockInMessageEnd.expectedMessageCount(0);
+        MockEndpoint mockErrorChannel = getMockEndpoint("mock:in-message-name-error");
+        mockErrorChannel.expectedMessageCount(1);
+        Key key = Key.newBuilder().setKey("1").build();
+        Value value = Value.newBuilder().setValue("test value").build();
+        Object[] request = {key, value};
+        template.sendBodyAndHeader("direct:in-message-name", request, AvroConstants.AVRO_MESSAGE_NAME, "/get");
+        mockErrorChannel.assertIsSatisfied(5000);
+        mockInMessageEnd.assertIsSatisfied();
+    }
+
+    @Test
     public void testInOut() throws InterruptedException {
         keyValue.getStore().clear();
         Key key = Key.newBuilder().setKey("2").build();
@@ -74,18 +113,33 @@ public abstract class AvroProducerTestSupport extends AvroTestSupport {
         mock.expectedMessageCount(1);
         mock.expectedBodiesReceived(value);
         template.sendBodyAndHeader("direct:inout", key, AvroConstants.AVRO_MESSAGE_NAME, "get");
-        mock.assertIsSatisfied(10000);
+        mock.assertIsSatisfied(5000);
     }
 
-    @Override
-    protected CamelContext createCamelContext() throws Exception {
-        CamelContext context = super.createCamelContext();
-        Protocol protocol = KeyValueProtocol.PROTOCOL;
-        AvroConfiguration configuration = new AvroConfiguration();
-        configuration.setProtocol(protocol);
-        AvroComponent component = new AvroComponent(context);
-        component.setConfiguration(configuration);
-        context.addComponent("avro", component);
-        return context;
+    @Test
+    public void testInOutMessageNameInRoute() throws InterruptedException {
+        keyValue.getStore().clear();
+        Key key = Key.newBuilder().setKey("2").build();
+        Value value = Value.newBuilder().setValue("test value").build();
+        keyValue.getStore().put(key, value);
+
+        MockEndpoint mock = getMockEndpoint("mock:result-inout-message-name");
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(value);
+        template.sendBody("direct:inout-message-name", key);
+        mock.assertIsSatisfied(5000);
+    }
+
+    @Test
+    public void testInOutReflection() throws InterruptedException {
+        int age = 100;
+        Object[] request = {age};
+
+        MockEndpoint mock = getMockEndpoint("mock:result-inout-reflection");
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(++age);
+        template.sendBody("direct:inout-reflection", request);
+        mock.assertIsSatisfied(5000);
     }
+
 }
\ No newline at end of file


Mime
View raw message