camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1332698 - in /camel/trunk/components/camel-avro/src/main/java/org/apache/camel: component/avro/ dataformat/avro/
Date Tue, 01 May 2012 15:25:16 GMT
Author: davsclaus
Date: Tue May  1 15:25:15 2012
New Revision: 1332698

URL: http://svn.apache.org/viewvc?rev=1332698&view=rev
Log:
CAMEL-5238: Fixed avro producer to support async routing engine.

Modified:
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpProducer.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyProducer.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
    camel/trunk/components/camel-avro/src/main/java/org/apache/camel/dataformat/avro/AvroDataFormat.java

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import java.lang.reflect.Field;
@@ -73,14 +72,10 @@ public class AvroComponent extends Defau
     }
 
     /**
-     * Applies enpoint parameters to configuration & resolves protocol and other required
configuration properties.
-     * @param config
-     * @param enpointUri
-     * @param parameters
-     * @throws Exception
+     * Applies endpoint parameters to configuration & resolves protocol and other required
configuration properties.
      */
-    private void applyToConfiguration(AvroConfiguration config, URI enpointUri, Map<String,
Object> parameters) throws Exception {
-        config.parseURI(enpointUri, parameters, this);
+    private void applyToConfiguration(AvroConfiguration config, URI endpointUri, Map<String,
Object> parameters) throws Exception {
+        config.parseURI(endpointUri, parameters, this);
         setProperties(config, parameters);
 
         if (config.getProtocol() == null && config.getProtocolClassName() != null)
{
@@ -99,7 +94,6 @@ public class AvroComponent extends Defau
         }
     }
 
-
     public AvroConfiguration getConfiguration() {
         return configuration;
     }

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import java.net.URI;
@@ -33,7 +32,6 @@ public class AvroConfiguration implement
     private String protocolClassName;
     private String transport;
 
-
     public AvroConfiguration copy() {
         try {
             AvroConfiguration answer = (AvroConfiguration) clone();

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 public final class AvroConstants {

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import org.apache.camel.Endpoint;

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import org.apache.avro.Protocol;
@@ -42,7 +41,6 @@ public abstract class AvroEndpoint exten
         this.configuration = configuration;
     }
 
-
     public Exchange createExchange(Protocol.Message message, Object request) {
         ExchangePattern pattern = ExchangePattern.InOut;
         if (message.getResponse().equals(Schema.Type.NULL)) {
@@ -54,14 +52,9 @@ public abstract class AvroEndpoint exten
         return exchange;
     }
 
-    /**
-     * Whether this class supports being singleton or not.
-     *
-     * @return <tt>true</tt> to be a single shared instance, <tt>false</tt>
to create new instances.
-     */
     @Override
     public boolean isSingleton() {
-        return false;
+        return true;
     }
 
     public AvroConfiguration getConfiguration() {

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import org.apache.avro.ipc.HttpServer;

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import org.apache.camel.Component;

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpProducer.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpProducer.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpProducer.java
Tue May  1 15:25:15 2012
@@ -32,7 +32,7 @@ public class AvroHttpProducer extends Av
     }
 
     @Override
-    public Transceiver createTranceiver() throws Exception {
+    public Transceiver createTransceiver() throws Exception {
         return new HttpTransceiver(new URL(URISupport.normalizeUri(getEndpoint().getEndpointUri())));
     }
 }

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import java.net.InetSocketAddress;

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import org.apache.camel.Component;

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyProducer.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyProducer.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyProducer.java
Tue May  1 15:25:15 2012
@@ -31,7 +31,7 @@ public class AvroNettyProducer extends A
     }
 
     @Override
-    public Transceiver createTranceiver() throws Exception {
+    public Transceiver createTransceiver() throws Exception {
         AvroConfiguration configuration = getEndpoint().getConfiguration();
         return transceiver = new NettyTransceiver(new InetSocketAddress(configuration.getHost(),
configuration.getPort()));
     }

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
Tue May  1 15:25:15 2012
@@ -14,19 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import org.apache.avro.ipc.Callback;
 import org.apache.avro.ipc.Requestor;
 import org.apache.avro.ipc.Transceiver;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ServicePoolAware;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
 
-public abstract class AvroProducer extends DefaultProducer implements ServicePoolAware {
+public abstract class AvroProducer extends DefaultAsyncProducer implements ServicePoolAware
{
 
     Transceiver transceiver;
     Requestor requestor;
@@ -35,35 +35,53 @@ public abstract class AvroProducer exten
         super(endpoint);
     }
 
-    public abstract Transceiver createTranceiver() throws Exception;
+    public abstract Transceiver createTransceiver() throws Exception;
 
-    /**
-     * Processes the message exchange
-     *
-     * @param exchange the message exchange
-     * @throws Exception if an internal processing error has occurred.
-     */
     @Override
-    public void process(final Exchange exchange) throws Exception {
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
         Object request = exchange.getIn().getBody();
 
         if (transceiver == null) {
-            transceiver = createTranceiver();
-            requestor = new AvroRequestor(getEndpoint().getProtocol(), transceiver);
+            try {
+                transceiver = createTransceiver();
+                requestor = new AvroRequestor(getEndpoint().getProtocol(), transceiver);
+            } catch (Exception e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
         }
 
-        requestor.request(exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class),
wrapObjectToArray(request), new Callback<Object>() {
-
-            @Override
-            public void handleResult(Object result) {
-                exchange.getOut().setBody(result);
-            }
+        try {
+            requestor.request(exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME,
String.class), wrapObjectToArray(request), new Callback<Object>() {
+                @Override
+                public void handleResult(Object result) {
+                    // got result from avro, so set it on the exchange and invoke the callback
+                    try {
+                        exchange.getOut().setBody(result);
+                    } finally {
+                        callback.done(false);
+                    }
+                }
+
+                @Override
+                public void handleError(Throwable error) {
+                    // got error from avro, so set it on the exchange and invoke the callback
+                    try {
+                        exchange.setException(error);
+                    } finally {
+                        callback.done(false);
+                    }
+                }
+            });
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
 
-            @Override
-            public void handleError(Throwable error) {
-                exchange.setException(error);
-            }
-        });
+        // okay we continue routing asynchronously
+        return false;
     }
 
     public Object[] wrapObjectToArray(Object object) {

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import java.io.IOException;

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.avro;
 
 import org.apache.avro.Protocol;
@@ -30,11 +29,6 @@ public class AvroResponder extends Speci
 
     private AvroConsumer consumer;
 
-    /**
-     * Constructor
-     *
-     * @param consumer
-     */
     public AvroResponder(AvroConsumer consumer) {
         super(consumer.getEndpoint().getProtocol(), null);
         this.consumer = consumer;
@@ -42,7 +36,7 @@ public class AvroResponder extends Speci
 
     @Override
     public Object respond(Protocol.Message message, Object request) throws Exception {
-        Object response = null;
+        Object response;
         int numParams = message.getRequest().getFields().size();
         Object[] params = new Object[numParams];
         Class<?>[] paramTypes = new Class[numParams];
@@ -76,6 +70,6 @@ public class AvroResponder extends Speci
             }
         }
         return response;
-
     }
+
 }

Modified: camel/trunk/components/camel-avro/src/main/java/org/apache/camel/dataformat/avro/AvroDataFormat.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-avro/src/main/java/org/apache/camel/dataformat/avro/AvroDataFormat.java?rev=1332698&r1=1332697&r2=1332698&view=diff
==============================================================================
--- camel/trunk/components/camel-avro/src/main/java/org/apache/camel/dataformat/avro/AvroDataFormat.java
(original)
+++ camel/trunk/components/camel-avro/src/main/java/org/apache/camel/dataformat/avro/AvroDataFormat.java
Tue May  1 15:25:15 2012
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.dataformat.avro;
 
 import java.io.InputStream;
@@ -44,14 +43,11 @@ public class AvroDataFormat implements D
     private Schema schema;
     private String instanceClassName;
 
-    /**
-     * @param schema
-     */
-    public AvroDataFormat(Schema schema) {
-        this.schema = schema;
+    public AvroDataFormat() {
     }
 
-    public AvroDataFormat() {
+    public AvroDataFormat(Schema schema) {
+        this.schema = schema;
     }
 
     public synchronized Schema getSchema(Exchange exchange, Object graph) throws Exception
{



Mime
View raw message