J2EE Without the Application Server
Pages: 1, 2, 3, 4, 5, 6, 7
Step 5: Coding the MessageDrivenBank
In this step, we will add the JMS processing logic. In order to do this, we merely need to implement the JMS MessageListener interface. We also add the public setBank method to make Spring's dependency injection work. The source code is below.
package jms;
import jdbc.Bank;
import javax.jms.Message;
import javax.jms.MapMessage;
import javax.jms.MessageListener;
public class MessageDrivenBank
implements MessageListener
{
private Bank bank;
public void setBank ( Bank bank )
{
this.bank = bank;
}
//this method can be private
//since it is only needed within
//this class
private Bank getBank()
{
return this.bank;
}
public void onMessage ( Message msg )
{
try {
MapMessage m = ( MapMessage ) msg;
int account = m.getIntProperty ( "account" );
int amount = m.getIntProperty ( "amount" );
bank.withdraw ( account , amount );
System.out.println ( "Withdraw of " +
amount + " from account " + account );
}
catch ( Exception e ) {
e.printStackTrace();
//force rollback
throw new RuntimeException (
e.getMessage() );
}
}
}
Step 6: Configuring the MessageDrivenBank
Here we configure our MessageDrivenBank to listen on a transactional (JTA-aware) QueueReceiverSessionPool. This gives us the same message guarantees as EJB (no message loss nor duplicate messages), but with simple POJO objects instead. When a MessageListener is plugged into the pool, the pool will make sure that messages are received within a JTA/XA transaction. Combined with a JTA/XA-capable JDBC datasource, we get reliable messaging. The resulting Spring configuration can be found below:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"
"http://www.springframework.org
/dtd/spring-beans.dtd">
<!--
NOTE: no explicit transaction manager bean
is necessary
because the QueueReceiverSessionPool will
start transactions by itself.
-->
<beans>
<bean id="datasource"
class="com.atomikos.jdbc.nonxa.NonXADataSourceBean">
<property name="user">
<value>sa</value>
</property>
<property name="url">
<value>jdbc:hsqldb:SpringNonXADB</value>
</property>
<property name="driverClassName">
<value>org.hsqldb.jdbcDriver</value>
</property>
<property name="poolSize">
<value>1</value>
</property>
<property name="connectionTimeout">
<value>60</value>
</property>
</bean>
<bean id="xaFactory"
class="org.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="queue"
class="org.activemq.message.ActiveMQQueue">
<property name="physicalName">
<value>BANK_QUEUE</value>
</property>
</bean>
<bean id="bank" class="jdbc.Bank">
<property name="dataSource">
<ref bean="datasource"/>
</property>
</bean>
<bean id="messageDrivenBank"
class="jms.MessageDrivenBank">
<property name="bank">
<ref bean="bank"/>
</property>
</bean>
<bean id="queueConnectionFactoryBean"
class="com.atomikos.jms.QueueConnectionFactoryBean">
<property name="resourceName">
<value>QUEUE_BROKER</value>
</property>
<property name="xaQueueConnectionFactory">
<ref bean="xaFactory"/>
</property>
</bean>
<bean id="queueReceiverSessionPool"
class="com.atomikos.jms.QueueReceiverSessionPool"
init-method="start">
<property name="queueConnectionFactoryBean">
<ref bean="queueConnectionFactoryBean"/>
</property>
<property name="transactionTimeout">
<value>120</value>
</property>
<!--
default license allows only limited
concurrency so keep pool small
-->
<property name="poolSize">
<value>1</value>
</property>
<property name="queue">
<ref bean="queue"/>
</property>
<property name="messageListener">
<ref bean="messageDrivenBank"/>
</property>
</bean>
</beans>
Because this article needs a JMS service that is easy to install, we will use ActiveMQ here. If you are using another JMS implementation, then you should still be able to apply the techniques outlined in this section. Next to the familiar datasource and bank objects, the following object definitions have been added:
- The
xaFactory: A connection factory for establishing JMS connections. - The
queue: This represents the JMSQueuewe will use, configured the way ActiveMQ requires. - The
queueConnectionFactoryBean: A JMS connector that is JTA-aware. - A
queueReceiverSessionPoolfor JTA-enabled message consumption. Note that we also specify an initialization method (i.e.,start) to be called; this is again a Spring feature. Thestartmethod is defined in the session pool class, and it is being referred to in the XML element of the Spring configuration file. - The
messageDrivenBankis responsible for processing the messages.
You may ask yourself where the transaction management has gone. Indeed, the objects that were added in the previous section have again disappeared. Why? Because we now use the QueueReceiverSessionPool to receive messages from JMS, and this class also starts a JTA transaction for each receive. We could have left the JTA configuration as it was and merely added the JMS elements, but it would have made the XML file a little longer. The session pool class now assumes the role of the transaction management added in Step 5. It works similar to the proxy approach; only this class needs a JMS MessageListener to add transactions to. With this configuration, a new transaction will be started before each message consumption, and this transaction will commit whenever our onMessage implementation returns normally. If there is a RuntimeException, then the transaction will be rolled back. The architecture is shown in Figure 5 (some of the JMS objects have been omitted for clarity).

Figure 5. Architecture for message-driven applications in Spring
The architecture now works as follows:
- The application retrieves the
bankobject and initializes the database tables if necessary. - The application retrieves the
queueReceiverSessionPool, thereby triggering a call of thestartmethod to begin listening for incoming messages. - The
queueReceiverSessionPooldetects a new message on the queue. - The
queueReceiverSessionPoolstarts a new transaction and registers with it. - The
queueReceiverSessionPoolcalls the registeredMessageListener(themessageDrivenBank, in our case). - This triggers a call on the
bank. - The
bankuses thedatasourceto access the database. - The
datasourceregisters with the transaction. - The database is accessed via JDBC.
- When the processing is done, the
queueReceiverSessionPoolterminates the transaction. Unless there is aRuntimeException, the desired outcome iscommit. - The transaction manager initiates two-phase commit with the message queue.
- The transaction manager initiates two-phase commit with the database.