no

How to implement JMS request/response feature in Glassfish

This tutorial will provide a sample code on how you can send request and receive reply using jms. I've used Glassfish 3.1.1, with its bu...

This tutorial will provide a sample code on how you can send request and receive reply using jms. I've used Glassfish 3.1.1, with its built in JMS Broker for testing set to LOCAL. We need 2 maven projects (1 is for request and 1 for receiving the request and to create a response). Let's call the projects jmsrequest and jmsreply. As usual, I prefer to explain by code :-). jmsrequest project (3 files): StartupListener.java
package com.ipiel.jmsrequest;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.jms.Connection;
import javax.jms.QueueConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.transaction.UserTransaction;

@Startup
@Singleton
@TransactionManagement(TransactionManagementType.BEAN)
public class StartupListener {
 private Connection conn = null;
 private Requestor requestor = null;
 @Resource
 private UserTransaction utx;

 @PostConstruct
 void init() {
  Context jndi;
  try {
   jndi = new InitialContext();
   QueueConnectionFactory factory = (QueueConnectionFactory) jndi
     .lookup("jms/sidoQueueFactory");
   conn = factory.createConnection();
   requestor = Requestor.newRequestor(conn, "jms/testRequestQueue",
     "jms/testReplyQueue", "jms/testInvalidQueue");
   send();
   synchronized (this) {
    try {
     wait(2000);
    } catch (InterruptedException e) {
    }
   }
   readReply();
   conn.close();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
 
 private void send() throws Exception {
  utx.begin();
  requestor.send();
  utx.commit();
 } 

 private void readReply() throws Exception {
  utx.begin();
  requestor.receiveSync();
  utx.commit();
 }
}
Requestor.java
package com.ipiel.jmsrequest;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;

public class Requestor {

 private Session session;
 private Destination replyQueue;
 private MessageProducer requestProducer;
 private MessageConsumer replyConsumer;
 private MessageProducer invalidProducer;

 protected Requestor() {
  super();
 }

 public static Requestor newRequestor(Connection connection,
   String requestQueueName, String replyQueueName,
   String invalidQueueName) throws JMSException, NamingException {

  Requestor requestor = new Requestor();
  requestor.initialize(connection, requestQueueName, replyQueueName,
    invalidQueueName);
  return requestor;
 }

 protected void initialize(Connection connection, String requestQueueName,
   String replyQueueName, String invalidQueueName)
   throws NamingException, JMSException {

  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  Destination requestQueue = JndiUtil.getDestination(requestQueueName);
  replyQueue = session.createTemporaryQueue();
  Destination invalidQueue = JndiUtil.getDestination(invalidQueueName);

  requestProducer = session.createProducer(requestQueue);
  replyConsumer = session.createConsumer(replyQueue);
  invalidProducer = session.createProducer(invalidQueue);
  
  connection.start();
 }

 public void send() throws Exception {
  TextMessage requestMessage = session.createTextMessage();
  requestMessage.setText("Hello world.");  
  requestMessage.setJMSReplyTo(replyQueue);
  requestProducer.send(requestMessage);
  requestProducer.close();  
 } 

 public void receiveSync() throws Exception { 
  System.out.println("Listening to reply " + replyConsumer);  
  Message msg = replyConsumer.receive(5000);
  System.out.println("reply: " + msg);
  
  if (msg instanceof TextMessage) {
   TextMessage replyMessage = (TextMessage) msg;
  } else {
   System.out.println("Invalid message detected");
  }
  replyConsumer.close();
 }
}
JndiUtil.java
package com.ipiel.jmsrequest;

import javax.jms.Destination;
import javax.jms.Queue;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JndiUtil {
 public static Destination getDestination(String requestQueueName) throws NamingException {
  Context jndi = new InitialContext();
  return (Queue) jndi.lookup(requestQueueName);
 }
}
Note that the request will start from the StartupListener, this bean will automatically start after deployment. jmsreply project (contains a single MDB): Replier.java
package com.ipiel.jmsreply;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

@MessageDriven(mappedName = "jms/testRequestQueue", activationConfig = {
  @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue") })
public class Replier implements MessageListener {
 public static Replier newReplier(Connection connection,
   String requestQueueName, String invalidQueueName)
   throws JMSException, NamingException {
  Replier replier = new Replier();
  replier.initialize(connection, requestQueueName, invalidQueueName);
  return replier;
 }

 public void initialize(Connection connection, String requestQueueName,
   String invalidQueueName) throws NamingException, JMSException {
  System.out.println("Init replier.");
 }

 public void onMessage(Message message) {
  try {
   if ((message instanceof TextMessage)
     && (message.getJMSReplyTo() != null)) {
    TextMessage requestMessage = (TextMessage) message;

    System.out.println("Received request");
   
    try {
     Context jndi = new InitialContext();
     QueueConnectionFactory factory = (QueueConnectionFactory) jndi
       .lookup("jms/sidoQueueFactory");
     Connection conn = factory.createConnection();
     Session session = conn.createSession(false,
       Session.AUTO_ACKNOWLEDGE);

     String contents = requestMessage.getText();
     Destination replyDestination = message.getJMSReplyTo();
     MessageProducer replyProducer = session
       .createProducer(replyDestination);

     TextMessage replyMessage = session.createTextMessage();
     replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
     replyMessage.setText(contents);
     replyProducer.send(replyMessage);
     replyProducer.close();
     System.out.println("Sent reply");    
    } catch (Exception e1) {

    }
   } else {
    System.out.println("Invalid message detected");   
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}
Things you should take extra careful:
1.) Before reading synchronously from a jms queue, make sure to start the connection by invoking the Connection.start() method.
2.) Make sure to create the MessageConsumer and MessageProducer with the same Session and Connection.
3.) It's easier to implement request/response, if your EJB's transaction management is set to BEAN, which can be achieve by the ff annotation.
@TransactionManagement(TransactionManagementType.BEAN)

And that's it, you should now have a working jms request/response project.

Related

java 6491120288395301114

Post a Comment

item