Thursday, December 1, 2011

The Composed Message Processor Pattern: Splitter, Router and Aggregator

The book Enterprise Integration Patterns, by of Gregor Hohpe & Bobby Woolf, introduces the Composed Message Processor Pattern. This pattern serves to send the same message to multiple modules (e.g., departments inside a company). To do this, the pattern has a splitter part, which gives an identifier (and more...) to each message, before routing multiple copies to different modules (e.g., the Widgets and the Gadgets departments) and reassembles the multiple messages again into a single one (the aggregator part), after the previous modules respond.

I hope they may forgive me for copying a picture of their book here:


With JBoss ESB, we have a number of "Out-of-the-box" actions that allow us to implement a pattern like this in a relatively simple way. The splitter and the router part work together in a single action. This action can be the StaticRouter or the ContentBasedRouter action (among others, like the WireTaps). All these are mentioned in the JBoss ESB "Programmers Guide". Later the aggregator waits for the right number of messages that were sent by the ContentBasedRouter, for instance, and returns a single message with all the messages it received in the attachment (it will timeout if some message does not arrive). Assume that the ContentBasedRouter sends two messages for the blue and green teams (none for the red one). Then, the green and the blue teams (e.g., Widget and Gadget Inventories if we want to stick to the figure) will take care of their copies of the messages and will forward their replies to the aggregator, which will consolidate the two messages, again, into a single one.

We are not going to see how to do a complete application like this, because it would strongly overlap with the following JBossESB samples: aggregator, fun_cbr, simple_cbr, static_router, jms_router and maybe wiretap. We will focus on how do these actions use the information that allows the aggregator to know exactly which messages it should pick. In the figure above that is the task of the splitter. In JBoss ESB many classes, like ContentBasedRouter, StaticRouter, etc., will add the following data, called "aggregatorTag" to the message:
- unique series id;
- message number inside the series;
- size of the series (i.e., how many messages exist);
- timestamp.

We can easily see that  after receiving a single message, the aggregator can determine how many more messages it should wait for and which other messages belong to this group (it can even know if it is receiving a duplicate message).

The next question is how can we manage this "aggregatorTag" in the JBoss ESB if we ever leave the bus. For instance, it is very likely that the message needs to enter some JMS queue (thus changing from its initial ESB format) to reenter later, after someone or some database checks the inventory. All we need to do is to take care of the aggregatorTag.

I suggest the following code for that:

Map<String, Object> outmap = new HashMap<String, Object>();
outmap.put("body", message.getBody().get());
outmap.put("ContextInfo", message.getContext().getContext("aggregatorTag"));
message.getBody().add(outmap);


This will store the current body of the ESB message and the aggregatorTag (which is in the context of the message) in a map and it will replace the current body of the ESB message with this map. After the action that performs this transformation, we can have another action that sends the message to a JMS queue. The JMSRouter action will not touch the body of the message, which will be visible in an external Java virtual machine (JVM):

 <action class="org.jboss.soa.esb.actions.routing.JMSRouter" name="routeToQueue">
<property name="connection-factory" value="ConnectionFactory" />
<property name="jndiName" value="queue/MyShop" />
<property name="unwrap" value="true" />
</action>

In the JVM, we will receive an ObjectMessageProxy, which contains an ObjectMessage, which contains a body with the map we prepared before. When the JVM is ready to reply, it can prepare a new JMS message (use the type ObjectMessage) and use a similar map scheme to make the reply and the tag enter the ESB world in the body of an ESB message. Then, on the ESB side, just before the aggregator action, we can have the following to put the body reply and the aggregatorTag back in their place:

Map<String, Object> map = (Map<String, Object>) message.getBody().get();
String body = (String) map.get("body");
message.getBody().add(body);
Object o = map.get("ContextInfo");
message.getContext().setContext("aggregatorTag", o);
In the end we have something like:

(service1) ContentBasedRouter --> (service2) request map creation (see code) --> out queue --> request map reading in the Java Virtual Machine --> JVM takes care of the request --> reply map creation in the JVM --> (service3) in queue to the ESB --> action to read the reply (see code) --> Aggregator.

Simple?