Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 02881D784 for ; Mon, 4 Mar 2013 07:06:39 +0000 (UTC) Received: (qmail 17021 invoked by uid 500); 4 Mar 2013 07:06:38 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 16929 invoked by uid 500); 4 Mar 2013 07:06:37 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 16910 invoked by uid 99); 4 Mar 2013 07:06:36 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Mar 2013 07:06:36 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Mar 2013 07:06:33 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id ECEF323888E7; Mon, 4 Mar 2013 07:06:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1452197 - in /camel/trunk/components/camel-rx/src: main/java/org/apache/camel/rx/ test/java/org/apache/camel/rx/ Date: Mon, 04 Mar 2013 07:06:12 -0000 To: commits@camel.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130304070612.ECEF323888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jstrachan Date: Mon Mar 4 07:06:12 2013 New Revision: 1452197 URL: http://svn.apache.org/r1452197 Log: added an overloaded version of toObservable() that takes the body type as an argument so that the body of the message can be extracted and processed as an Observable directly for cases where the user knows the type of the payload in a message Added: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java - copied, changed from r1452192, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java (with props) Modified: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java Modified: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java?rev=1452197&r1=1452196&r2=1452197&view=diff ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java (original) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java Mon Mar 4 07:06:12 2013 @@ -49,6 +49,17 @@ public class ReactiveCamel { } /** + * Returns an {@link rx.Observable } for the messages with their payload converted to the given type + * to allow the messages sent on the endpoint + * to be processed using Reactive Extensions + */ + public Observable toObservable(String uri, final Class bodyType) { + return toObservable(camelContext.getEndpoint(uri), bodyType); + } + + + + /** * Returns an {@link rx.Observable < org.apache.camel.Message >} to allow the messages sent on the endpoint * to be processed using Reactive Extensions */ @@ -61,6 +72,21 @@ public class ReactiveCamel { }); } + /** + * Returns an {@link rx.Observable } for the messages with their payload converted to the given type + * to allow the messages sent on the endpoint + * to be processed using Reactive Extensions + */ + public Observable toObservable(Endpoint endpoint, final Class bodyType) { + return createEndpointObservable(endpoint, new Func1() { + @Override + public T call(Exchange exchange) { + Message in = exchange.getIn(); + return in.getBody(bodyType); + } + }); + } + public CamelContext getCamelContext() { return camelContext; } Copied: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java (from r1452192, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java&r1=1452192&r2=1452197&rev=1452197&view=diff ============================================================================== --- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java (original) +++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java Mon Mar 4 07:06:12 2013 @@ -17,7 +17,6 @@ */ package org.apache.camel.rx; -import org.apache.camel.Message; import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; import org.slf4j.Logger; @@ -25,27 +24,48 @@ import org.slf4j.LoggerFactory; import rx.Observable; import rx.util.functions.Action1; +import rx.util.functions.Func1; /** */ -public class ObservableMessageTest extends RxTestSupport { - private static final transient Logger LOG = LoggerFactory.getLogger(ObservableMessageTest.class); +public class ObservableBodyTest extends RxTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(ObservableBodyTest.class); @Test public void testConsume() throws Exception { final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class); - mockEndpoint.expectedMessageCount(4); + mockEndpoint.expectedBodiesReceived("b", "d"); - Observable observable = reactiveCamel.toObservable("timer://foo?fixedRate=true&period=100"); - observable.take(4).subscribe(new Action1() { + // lets consume, filter and map events + Observable observable = reactiveCamel.toObservable("seda:orders", Order.class); + Observable largeOrderIds = observable.filter(new Func1() { + public Boolean call(Order order) { + return order.getAmount() > 100.0; + } + }).map(new Func1() { + public String call(Order order) { + return order.getId(); + } + }); + + + // lets route the largeOrderIds to the mock endpoint for testing + largeOrderIds.take(2).subscribe(new Action1() { @Override - public void call(Message message) { - String body = "Processing message headers " + message.getHeaders(); - LOG.info(body); + public void call(String body) { + LOG.info("Processing " + body); producerTemplate.sendBody(mockEndpoint, body); } }); + + // now lets send some orders in + Order[] orders = {new Order("a", 49.95), new Order("b", 125.50), new Order("c", 22.95), + new Order("d", 259.95), new Order("e", 1.25)}; + for (Order order : orders) { + producerTemplate.sendBody("seda:orders", order); + } + mockEndpoint.assertIsSatisfied(); } } Added: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java?rev=1452197&view=auto ============================================================================== --- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java (added) +++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java Mon Mar 4 07:06:12 2013 @@ -0,0 +1,42 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx; + +/** + */ +public class Order { + private String id; + private double amount; + + public Order(String id, double amount) { + this.amount = amount; + this.id = id; + } + + public String toString() { + return "Order[id " + id + ", amount " + amount + "]"; + } + + public double getAmount() { + return amount; + } + + public String getId() { + return id; + } +} Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java?rev=1452197&r1=1452196&r2=1452197&view=diff ============================================================================== --- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java (original) +++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java Mon Mar 4 07:06:12 2013 @@ -36,10 +36,12 @@ public abstract class RxTestSupport { reactiveCamel = new ReactiveCamel(camelContext); producerTemplate = camelContext.createProducerTemplate(); camelContext.start(); + producerTemplate.start(); } @After public void destroy() throws Exception { + producerTemplate.stop(); camelContext.stop(); } }