Thursday, November 24, 2011

How to Reply to a Different JBoss ESB Service

Assume that you have service S1 that cannot have the reply you need ready. Only service S2, which is invoked later by someone else, has the reply you need for service S1.

A very simple case where you may need this, but that you can solve differently: S1 starts a jBPM process orchestrator that does some actions and you want the client invoking S1 to block until the orchestrator has the results ready. Since S1 will not wait until the orchestrator finishes, you need the  orchestrator to call some service S2 to submit the results and somehow S2 should send the results back to the S1 client. In this case you can use the replyToOriginator tag in the ESBNotifier (see the Services Guide). Nevertheless, the following will also work.


Consider the other example I wrote here: Services with one-way requests (without reply) & how to reply later. You can see a couple of lines in the end that are supposed to send a message to the target you want:

LogicalEPR lepr = new LogicalEPR(message.getHeader().getCall().getReplyTo());
ServiceInvoker si = lepr.getServiceInvoker();
si.deliverAsync(message);

Unfortunately this solution will not work in all cases. The problem is in the address we get in the getReplyTo(). In the previous case this was a Logical End-Point, more precisely, the reply-to of the message pointed to the JBpmCallbackService. What if the reply-to address is a physical (?) end-point, say a queue? The previous three lines will simply not work (the first one will throw an exception).

To set the problem, let us see the following jboss-esb.xml.


<?xml version="1.0"?>

<jbossesb parameterReloadSecs="5"
xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd http://anonsvn.jboss.org/repos/labs/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd">
<providers>
<jms-provider connection-factory="ConnectionFactory"
name="JMS">
<jms-bus busid="ReplyLaterGWChannel">
<jms-message-filter dest-name="queue/reply_later_Request_gw"
dest-type="QUEUE" />
</jms-bus>
<jms-bus busid="ReplyLaterEsbChannel">
<jms-message-filter dest-name="queue/reply_later_Request_esb"
dest-type="QUEUE" />
</jms-bus>
<jms-bus busid="ReplyLaterEsbChannel2">
<jms-message-filter dest-name="queue/reply_later_Request_esb2"
dest-type="QUEUE" />
</jms-bus>
</jms-provider>
</providers>
<services>
<service category="My_ReplyLater_Service"
description="Reply Late Service: Use this service to invoke the hello"
name="Hello">
<listeners>
<jms-listener busidref="ReplyLaterGWChannel"
is-gateway="true" name="JMS-Gateway" />
<jms-listener busidref="ReplyLaterEsbChannel" name="ESB-Listener" />
</listeners>
<actions mep="OneWay">
<action class="actions.MyListenerAction" name="helloaction"
process="hello" />
<action class="org.jboss.soa.esb.actions.SystemPrintln" name="printMessage">
<property name="message" value="JMS Secured Quickstart message" />
<property name="printfull" value="true" />
</action>

<action name="routeAction" class="org.jboss.soa.esb.actions.StaticRouter">
<property name="destinations">
<route-to destination-name="other-hello"
service-category="My_ReplyLater_Service" service-name="Hello2" />
</property>
</action>

<action class="actions.MyListenerAction" name="helloaction2"
process="hello2" />
</actions>
</service>
<service category="My_ReplyLater_Service"
description="Reply Late Service: Use this service to invoke the hello"
name="Hello2">
<listeners>
<jms-listener busidref="ReplyLaterEsbChannel2" name="ESB-Listener2" />
</listeners>
<actions mep="OneWay">
<action class="actions.MyListenerAction" name="helloaction3"
process="hello3" />
<action class="org.jboss.soa.esb.actions.SystemPrintln" name="printMessage">
<property name="message" value="JMS Secured Quickstart message" />
<property name="printfull" value="true" />
</action>
</actions>
</service>
</services>
</jbossesb>

We have two services:

  • My_ReplyLater_Service/Hello, has an helloaction to begin, a helloaction2 to finish (you will see that the service will never reach helloaction2) and prints the message in the middle, before routing it to  
  • My_ReplyLater_Service/Hello2. The My_ReplyLater_Service/Hello2 invokes helloaction3 and prints the message. 

A crucial point here is that both services are OneWay, i.e., they do not return results to their callers. In this demonstration, the client will invoke My_ReplyLater_Service/Hello synchronously, but will never get a reply, because the service is OneWay. Instead we will "manually" send the reply in the helloaction3, which belongs to a different service.


package actions;


import org.jboss.soa.esb.actions.AbstractActionLifecycle;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.couriers.Courier;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.message.Message;

public class MyListenerAction extends AbstractActionLifecycle
{

protected ConfigTree _config;

public MyListenerAction(ConfigTree config) {
_config = config; 

public Message hello(Message message) {
System.out.println("---------------------------------- hello ----------------------------------");
//String xmlEPR = EPRHelper.toXMLString(message.getHeader().getCall().getReplyTo());
//message.getBody().add("replyToAddress", xmlEPR);
System.out.println("-------------------------------- end hello --------------------------------");
return message;
}

public Message hello2(Message message) {
System.out.println("---------------------------------- hello 2 ----------------------------------");
System.out.println("-------------------------------- end hello 2 --------------------------------");
return null;
}

public Message hello3(Message message) throws CourierException, MalformedEPRException {
System.out.println("---------------------------------- hello 3 ----------------------------------");
System.out.println(message);
//String xml = (String) message.getBody().get("replyToAddress");
//EPR jmsepr = (EPR) EPRHelper.fromXMLString(xml);
EPR jmsepr = message.getHeader().getCall().getReplyTo();
message.getHeader().getCall().setTo(jmsepr);

Courier courier = CourierFactory.getCourier(jmsepr);
courier.deliver(message);
courier.cleanup();
System.out.println("-------------------------------- end hello 3 --------------------------------");

return null;
}

}

We get the JMS End-Point Representation from the first service and set the destination of the message using the setTo(). The destination of the message is the client that invoked the My_ReplyLater_Service/Hello service and provided a reply-to field in the message. Note that this is only possible, because this case is somewhat simple and uses only a single message. In other cases, we might need to store and retrieve the reply-to field using some other mechanism. I added in comments the code to do it in a separate body field of the message, using the EPRHelper class (in case the replyTo is changed somewhere).

The Courier class will do the hard work we need, because it accepts Physical EPRs (in fact, the ServiceInvoker also uses it). Unfortunately, the JBoss ESB documentation is very scarce in words about the Courier class. In fact, Courier is an interface. The true class behind this example is JmsCourier. I think that the name says it all: it uses JMS. The courier.cleanup() line will close the JMS MessageProducer (if not closed it may block other MessageProducers that try to write later to the same queue - I saw cases where the service would block without it).


Now, the other files. The deployment.xml:

<?xml version="1.0"?>

<jbossesb-deployment>
  <jmsQueue>reply_later_Request_gw</jmsQueue>
  <jmsQueue>reply_later_Request_esb</jmsQueue>
  <jmsQueue>reply_later_Request_esb2</jmsQueue>
  <jmsQueue>reply_later_Request_esb_reply</jmsQueue>
</jbossesb-deployment>

The jbm-queue-service.xml:


<?xml version="1.0" encoding="UTF-8"?>
<server>

<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.esb.quickstart.destination:service=Queue,name=reply_later_Request_gw"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer
</depends>
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.esb.quickstart.destination:service=Queue,name=reply_later_Request_esb"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer
</depends>
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.esb.quickstart.destination:service=Queue,name=reply_later_Request_esb2"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer
</depends>
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.esb.quickstart.destination:service=Queue,name=reply_later_Request_esb_reply"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer
</depends>
</mbean>
</server>

This is how the ESB project looks like in my JBoss Developer Studio:



Now, to synchronously invoke the Hello service, refer to my other message, and use the following arguments:

My_ReplyLater_Service Hello "my text goes here"

In the end you may want to print the contents of the message like this:

System.out.println("I said: " + reply.getBody().get());

Saturday, November 12, 2011

Using Objects in JBoss ESB

Sending objects back and forth in JBoss ESB is actually easy! Let's create a service that receives a List<String> (object) and transform the list into a Set<String> (object). This will remove any duplicate entries:

package data;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.jboss.soa.esb.actions.AbstractActionLifecycle;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;

public class MyListenerAction extends AbstractActionLifecycle
{

protected ConfigTree _config;

public MyListenerAction(ConfigTree config) {
_config = config; 

public Message transform(Message message) throws MessageDeliverException {
System.out.println("---------------------------------- transform ----------------------------------");
@SuppressWarnings("unchecked")
List<String> names = (List<String>) message.getBody().get();
Set<String> nameset = new HashSet<String>(names);
message.getBody().add(nameset);
System.out.println("-------------------------------- end transform --------------------------------");
return message;
}

}


We can define this service as follows in jboss-esb.xml:
<?xml version="1.0"?>
<jbossesb parameterReloadSecs="5"
xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd http://anonsvn.jboss.org/repos/labs/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.3.0.xsd">
<providers>
<jms-provider connection-factory="ConnectionFactory"
name="JMS">
<jms-bus busid="transformEsbChannel">
<jms-message-filter dest-name="queue/transform_Request_esb"
dest-type="QUEUE" />
</jms-bus>
</jms-provider>
</providers>
<services>
<service category="Objects_in_Messages_Service"
description="Ojbect in Messages: Use this service to invoke the service"
name="send">
<listeners>
<jms-listener busidref="transformEsbChannel" name="ESB-Listener" />
</listeners>
<actions>
<action class="data.MyListenerAction" name="transformaction"
process="transform" />
</actions>
</service>
</services>
</jbossesb>

the deployment.xml:
<?xml version="1.0"?>
<jbossesb-deployment>
<jmsQueue>transform_Request_esb</jmsQueue>
<jmsQueue>transform_Request_esb_reply</jmsQueue>
</jbossesb-deployment>

and the jbm-queue-service.xml. Let me point out the detail that we need a queue for the service to reply:
<?xml version="1.0" encoding="UTF-8"?>
<server>

<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.esb.quickstart.destination:service=Queue,name=transform_Request_esb"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer
</depends>
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.esb.quickstart.destination:service=Queue,name=transform_Request_esb_reply"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer
</depends>
</mbean>
</server>


This is how the project looks:





And now we need a project for the client (refer to my other message about the Asynchronous Invoker). The difference is that in this example we have a Synchronous Invoker. The method we invoke is now the deliverSync(). So the main method of this project is:
import java.util.Arrays;
import java.util.List;
import java.util.Set;

import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.client.ServiceInvoker;

public class ExchangeObjects
{
public static void main(String args[]) throws Exception
{
// Setting the ConnectionFactory such that it will use scout
System.setProperty("javax.xml.registry.ConnectionFactoryClass","org.apache.ws.scout.registry.ConnectionFactoryImpl");

String names[] = {"Jose", "Joao", "Pedro", "Paula", "Joao"};
List<String> mylist = Arrays.asList(names);

Message esbMessage = MessageFactory.getInstance().getMessage();
esbMessage.getBody().add(mylist);

ServiceInvoker si = new ServiceInvoker("Objects_in_Messages_Service", "send");
Message retMessage = si.deliverSync(esbMessage, 10000L);

@SuppressWarnings("unchecked")
Set<String> resultset = (Set<String>) retMessage.getBody().get();
System.out.println("Results. Only one Joao should exist...");
for (String name : resultset)
System.out.println(name);
}

}

To give you some help, this is how this project looks in my JBoss Developer Studio. Refer to my other message about Asynchronous Invoking for further help:


Services with one-way requests (without reply) & how to reply later

Now, let's play with actions that don't return any result. It's quite easy. All you need to do is to add the property mep="OneWay" in the actions:

<actions mep="OneWay">

<action class="action.MyListenerAction" name="helloaction" process="hello" />
</actions>

What happens as you run the service:

16:35:38,855 INFO  [STDOUT] ---------------------------------- Initialize ----------------------------------
16:35:38,855 INFO  [STDOUT] Initialize got the following data. User = Paul. Text = I would like to say good morning to everybody!
16:35:38,856 INFO  [STDOUT] -------------------------------- end initialize --------------------------------

16:35:39,107 INFO  [STDOUT] MessageActionHandler: Going to the first state!
16:35:39,130 INFO  [InquiryHelper] uddi:juddi.apache.org:737e3ce5-cb96-4c3c-9146-00d9c7baa300 is modified Sat Nov 12 16:35:27 WET 2011 1321115727960
16:35:39,253 INFO  [STDOUT] ---------------------------------- hello ----------------------------------
16:35:39,254 INFO  [STDOUT] -------------------------------- end hello --------------------------------

You don't get back to MessageActionHandler2. Why? Because the My_Demo_Service/Hello never returns to the first state of the orchestrator.


Remember: in the first state we would call My_Demo_Service/Hello and get some response. As we now have a OneWay service, no response ever comes back. How do we pass the process to the next state? We need to signal the process in the jbpm console. Just like here: 


This will let the process go to the final state and finish. At this point, the MessageActionHandler2 is finally invoked and you can see the following in the JBoss esb console:

16:51:21,059 INFO  [STDOUT] Inside MessageActionHandler2
16:51:21,063 INFO  [STDOUT] I would like to say good morning to everybody!

You should notice that the changes introduced by the My_Demo_Service/Hello service are not received by the process.

Next, we would like to have the process suspended until some action occurred (possibly much later) and still reply to automatically let the process go to the final state with all the information. That turns out to be very easy. A few lines in the MyListenerActionClass will "manually" send the reply message back to the "first" state of the orchestrator:



package action;
/*
 * JBoss, Home of Professional Open Source
 * Copyright 2006, JBoss Inc., and others contributors as indicated
 * by the @authors tag. All rights reserved.
 * See the copyright.txt in the distribution for a
 * full listing of individual contributors.
 * This copyrighted material is made available to anyone wishing to use,
 * modify, copy, or redistribute it subject to the terms and conditions
 * of the GNU Lesser General Public License, v. 2.1.
 * This program is distributed in the hope that it will be useful, but WITHOUT A
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
 * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
 * You should have received a copy of the GNU Lesser General Public License,
 * v.2.1 along with this distribution; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 * MA  02110-1301, USA.
 *
 * (C) 2005-2006,
 * @author JBoss Inc.
 */


import org.jboss.soa.esb.actions.AbstractActionLifecycle;
import org.jboss.soa.esb.addressing.eprs.LogicalEPR;
import org.jboss.soa.esb.client.ServiceInvoker;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;

public class MyListenerAction extends AbstractActionLifecycle
{

protected ConfigTree _config;

public MyListenerAction(ConfigTree config) {
_config = config;
}

public Message hello(Message message) throws MessageDeliverException {
System.out.println("---------------------------------- hello ----------------------------------");
String who = (String) message.getBody().get("whoami");
String textinbody = (String) message.getBody().get();
message.getBody().add("Hello " + who + ". You said: " + textinbody);

LogicalEPR lepr = new LogicalEPR(message.getHeader().getCall().getReplyTo());
ServiceInvoker si = lepr.getServiceInvoker();
si.deliverAsync(message);

System.out.println("-------------------------------- end hello --------------------------------");
return message;
}

}