Splitter has been edited by Claus Ibsen (Apr 09, 2009).

(View changes)

Content:

Splitter

The Splitter from the EIP patterns allows you split a message into a number of pieces and process them individually

As of Camel 2.0, you need to specify a Splitter as split(). In earlier versions of Camel, you need to use splitter().

What does the splitter return?

The Splitter will by default return the last splitted message. You can override this by suppling your own strategy as an AggregationStrategy. There is a sample on this page (Split aggregate request/reply sample). Notice its the same strategy as the Aggregator supports. This Splitter can be viewed as having a build in light weight Aggregator.

Example

The following example shows how to take a request from the queue:a endpoint the split it into pieces using an Expression, then forward each piece to queue:b

Using the Fluent Builders

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        errorHandler(deadLetterChannel("mock:error"));

        from("seda:a").split(body(String.class).tokenize("\n")).to("seda:b");
    }
};

The splitter can use any Expression language so you could use any of the Languages Supported such as XPath, XQuery, SQL or one of the Scripting Languages to perform the split. e.g.

from("activemq:my.queue").split(xpath("//foo/bar")).convertBodyTo(String.class).to("file://some/directory")

Using the Spring XML Extensions

<camelContext errorHandlerRef="errorHandler" id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="seda:a"/>
        <split>
            <xpath>/invoice/lineItems</xpath>
            <to uri="seda:b"/>
        </split>
    </route>
</camelContext>

For further examples of this pattern in use you could look at one of the junit test case

Using Tokenizer from Spring XML Extensions
Avaiaible as of Camel 2.0

You can use the tokenizer expression in the Spring DSL to split bodies or headers using a token. This is a common use-case, so we provided a special tokenizer tag for this.
In the sample below we split the body using a @ as separator. You can of course use comma or space or even a regex pattern, also set regex=true.

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <split>
            <tokenize token="@"/>
            <to uri="mock:result"/>
        </split>
    </route>
</camelContext>

Message Headers

The following headers is set on each Exchange that are split:

header type description
org.apache.camel.splitCounter int Camel 1.x: A split counter that increases for each Exchange being split. The counter starts from 0.
org.apache.camel.splitSize int Camel 1.x: The total number of Exchanges that was splitted. This header is not applied for stream based splitting. And is only set if the Expression returns an instance of java.util.Collection.
CamelSplitIndex int Camel 2.0: A split counter that increases for each Exchange being split. The counter starts from 0.
CamelSplitSize int Camel 2.0: The total number of Exchanges that was splitted. This header is not applied for stream based splitting. And is only set if the Expression returns an instance of java.util.Collection.

Parallel execution of distinct 'parts'

If you want to execute all parts in parallel you can use special notation of split() with two arguments, where the second one is a boolean flag if processing should be parallel. e.g.

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); 
from("activemq:my.queue").split(xPathBuilder, true).to("activemq:my.parts");

In Camel 2.0 the boolean option has been refactored into a builder method parallelProcessing so its easier to understand what the route does when we use a method instead of true|false.

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); 
from("activemq:my.queue").split(xPathBuilder).parallelProcessing().to("activemq:my.parts");

Stream based

Available as of Camel 1.5

You can split streams by enabling the streaming mode using the streaming builder method.

from("direct:streaming").split(body().tokenize(",")).streaming().to("activemq:my.parts");

Specifying a custom aggregation strategy

Available as of Camel 2.0

This is specified similar to the Aggregator.

Specifying a custom ThreadPoolExecutor

You can customize the underlying ThreadPoolExecutor used in the parallel splitter. In the Java DSL try something like this:

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); 
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
from("activemq:my.queue").split(xPathBuilder, true, threadPoolExecutor).to("activemq:my.parts");

In the Spring DSL try this:

Available as of Camel 1.6.0

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:parallel-custom-pool"/>
    <split threadPoolExecutorRef="threadPoolExecutor">
      <xpath>/invoice/lineItems</xpath>
      <to uri="mock:result"/>
    </split>
  </route>
</camelContext>

<!-- There's an easier way of specifying constructor args, just can't remember it
     at the moment... old Spring syntax will do for now! -->
<bean id="threadPoolExecutor" class="java.util.concurrent.ThreadPoolExecutor">
  <constructor-arg index="0" value="8"/>
  <constructor-arg index="1" value="16"/>
  <constructor-arg index="2" value="0"/>
  <constructor-arg index="3" value="MILLISECONDS"/>
  <constructor-arg index="4"><bean class="java.util.concurrent.LinkedBlockingQueue"/></constructor-arg>
</bean>

Using a Pojo to do the splitting

As the Splitter can use any Expression to do the actual splitting we leverage this fact and use a method expression to invoke a Bean to get the splitted parts.
The Bean should return a value that is iterable such as: java.util.Collection, java.util.Iterator or an array.

In the route we define the Expression as a method call to invoke our Bean that we have registered with the id mySplitterBean in the Registry.

from("direct:start")
        // here we use a POJO bean mySplitterBean to do the split of the payload
        .split().method("mySplitterBean")
        .to("mock:result");

And the logic for our Bean is as simple as. Notice we use Camel Bean Binding to pass in the message body as a String object.

public class MySplitterBean {

    /**
     * The split method returns something that is iteratable such as a java.util.List.
     *
     * @param body the payload of the incoming message
     * @return a list containing each part splitted
     */
    public List split(String body) {
        // since this is based on an unit test you can of couse
        // use different logic for splitting as Camel have out
        // of the box support for splitting a String based on comma
        // but this is for show and tell, since this is java code
        // you have the full power how you like to split your messages
        List answer = new ArrayList();
        String[] parts = body.split(",");
        for (String part : parts) {
            answer.add(part);
        }
        return answer;
    }
}

Split aggregate request/reply sample

This sample shows how you can split an Exchange, process each splitted message, aggregate and return a combined response to the original caller using request/reply.

The route below illustrates this and how the split supports a aggregationStrategy to hold the in progress processed messages:

// this routes starts from the direct:start endpoint
// the body is then splitted based on @ separator
// the splitter in Camel supports InOut as well and for that we need
// to be able to aggregate what response we need to send back, so we provide our
// own strategy with the class MyOrderStrategy.
from("direct:start")
    .split(body().tokenize("@"), new MyOrderStrategy())
        // each splitted message is then send to this bean where we can process it
        .to("bean:MyOrderService?method=handleOrder")
        // this is important to end the splitter route as we do not want to do more routing
        // on each splitted message
    .end()
    // after we have splitted and handled each message we want to send a single combined
    // response back to the original caller, so we let this bean build it for us
    // this bean will receive the result of the aggregate strategy: MyOrderStrategy
    .to("bean:MyOrderService?method=buildCombinedResponse")

And the OrderService bean is as follows:

public static class MyOrderService {

    private static int counter;

    /**
     * We just handle the order by returning a id line for the order
     */
    public String handleOrder(String line) {
        LOG.debug("HandleOrder: " + line);
        return "(id=" + ++counter + ",item=" + line + ")";
    }

    /**
     * We use the same bean for building the combined response to send
     * back to the original caller
     */
    public String buildCombinedResponse(String line) {
        LOG.debug("BuildCombinedResponse: " + line);
        return "Response[" + line + "]";
    }
}

And our custom aggregationStrategy that is responsible for holding the in progress aggregated message that after the splitter is ended will be sent to the buildCombinedResponse method for final processing before the combined response can be returned to the waiting caller.

/**
 * This is our own order aggregation strategy where we can control
 * how each splitted message should be combined. As we do not want to
 * loos any message we copy from the new to the old to preserve the
 * order lines as long we process them
 */
public static class MyOrderStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // put order together in old exchange by adding the order from new exchange

        // copy from OUT as we use InOut pattern
        String orders = oldExchange.getOut().getBody(String.class);
        String newLine = newExchange.getOut().getBody(String.class);

        LOG.debug("Aggregate old orders: " + orders);
        LOG.debug("Aggregate new order: " + newLine);

        // put orders together separating by semi colon
        orders = orders + ";" + newLine;
        // put combined order back on old to preserve it
        oldExchange.getOut().setBody(orders);

        // return old as this is the one that has all the orders gathered until now
        return oldExchange;
    }
}

So lets run the sample and see how it works.
We send an Exchange to the direct:start endpoint containing a IN body with the String value: A@B@C. The flow is:

HandleOrder: A
HandleOrder: B
Aggregate old orders: (id=1,item=A)
Aggregate new order: (id=2,item=B)
HandleOrder: C
Aggregate old orders: (id=1,item=A);(id=2,item=B)
Aggregate new order: (id=3,item=C)
BuildCombinedResponse: (id=1,item=A);(id=2,item=B);(id=3,item=C)
Response to caller: Response[(id=1,item=A);(id=2,item=B);(id=3,item=C)]

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Powered by Atlassian Confluence (Version: 2.2.9 Build:#527 Sep 07, 2006) - Bug/feature request

Unsubscribe or edit your notifications preferences