View Javadoc
1   package ejava.examples.jms10.jmsmechanics;
2   
3   import static org.junit.Assert.*;
4   
5   
6   import java.io.ByteArrayInputStream;
7   import java.io.ByteArrayOutputStream;
8   import java.io.PrintWriter;
9   import java.io.Serializable;
10  import java.io.StringWriter;
11  import java.util.HashMap;
12  import java.util.Map;
13  import java.util.Properties;
14  
15  import javax.jms.BytesMessage;
16  import javax.jms.Destination;
17  import javax.jms.JMSException;
18  import javax.jms.MapMessage;
19  import javax.jms.Message;
20  import javax.jms.MessageConsumer;
21  import javax.jms.MessageListener;
22  import javax.jms.MessageProducer;
23  import javax.jms.ObjectMessage;
24  import javax.jms.Session;
25  import javax.jms.Queue;
26  import javax.jms.StreamMessage;
27  import javax.jms.TextMessage;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.junit.After;
32  import org.junit.Before;
33  import org.junit.Test;
34  
35  /**
36   * This test case performs a demonstration of using a each message type.
37   */
38  public class MessageTest extends JMSTestBase {
39      static final Logger logger = LoggerFactory.getLogger(MessageTest.class);
40      protected Destination destination;        
41      
42      protected Session session = null;
43      protected MessageProducer producer = null;
44      protected MessageConsumer consumer = null;
45      protected MessageConsumer replyConsumer = null;
46      protected Destination replyDestination = null;
47      protected Replier client;
48  
49      @Before
50      public void setUp() throws Exception {
51          destination = (Queue) lookup(queueJNDI);
52          assertNotNull("null destination:" + queueJNDI, destination);
53          
54          emptyQueue();
55  
56          //setup replies
57          session = connection.createSession(
58                  false, Session.AUTO_ACKNOWLEDGE);
59          consumer = session.createConsumer(destination);
60          client = new Replier();
61          client.setSession(session);
62          consumer.setMessageListener(client);                    
63          producer = session.createProducer(destination);            
64          replyDestination = session.createTemporaryQueue();
65          replyConsumer = session.createConsumer(replyDestination);
66          connection.start();
67      }
68      
69      @After
70      public void tearDown() throws Exception {
71          if (client != null) { client.close(); }
72          if (connection != null) { connection.stop(); }
73          if (replyConsumer != null) { replyConsumer.close(); }
74          if (consumer != null) { consumer.close(); }
75          if (producer != null) { producer.close(); }
76          if (session != null)  { session.close(); }
77      }
78      
79      protected void emptyQueue() throws JMSException {
80          Session session = null;
81          MessageConsumer consumer = null;
82          try {
83              connection.stop();
84              session = connection.createSession(
85                  false, Session.AUTO_ACKNOWLEDGE);
86              consumer = session.createConsumer(destination);
87              connection.start();
88              Message message = null;
89              do {
90                  message = consumer.receiveNoWait();
91                  logger.debug("clearing old message {}", message);
92              } while (message != null);
93              connection.stop();
94          }
95          finally {
96              if (consumer != null) { consumer.close(); }
97              if (session != null) { session.close(); }
98          }
99      }
100     
101     //this class is used to provide an example of a custom class sent within
102     //a serializable payload
103     private static class MyInteger implements Serializable {
104         private static final long serialVersionUID = 6914811570078480852L;
105         private int value;
106         public MyInteger(int value) { this.value = value; }
107         public int getValue() { return value; }
108     }
109     
110     private class Replier implements MessageListener {
111         private MessageProducer producer;
112         public void setSession(Session session) throws JMSException {
113             producer = session.createProducer(null);
114         }
115         public void onMessage(Message request) {
116             try {
117                 logger.debug("onMessage received:{}:{}", 
118                         request.getJMSMessageID(), request.getClass().getName());
119                 Destination replyDestination = request.getJMSReplyTo();
120                 
121                 Message reply = null;
122                 if (request instanceof StreamMessage) {
123                     reply = getReply((StreamMessage)request);
124                 }                
125                 else if (request instanceof MapMessage) {
126                     reply = getReply((MapMessage)request);
127                 }                
128                 else if (request instanceof TextMessage) {
129                     reply = getReply((TextMessage)request);
130                 }                
131                 else if (request instanceof BytesMessage) {
132                     reply = getReply((BytesMessage)request);
133                 }                
134                 else if (request instanceof ObjectMessage) {
135                     reply = getReply((ObjectMessage)request);
136                 }                
137                 else {
138                     reply = getReply(request);
139                 }
140                 reply.setJMSCorrelationID(request.getJMSMessageID());
141                 producer.send(replyDestination, reply);
142                 
143             } catch (Exception ex) {
144                 logger.error("error handling message", ex);
145             }
146         }        
147         public void close() throws JMSException {
148             if (producer != null) { producer.close(); }
149         }
150         
151         protected Message getReply(StreamMessage request) throws JMSException {
152             String operator = request.readString();
153             int operand1 = request.readInt();
154             int operand2 = request.readInt();
155             int result = ("add".equals(operator) ? operand1 + operand2 : -1);
156             StreamMessage reply = session.createStreamMessage();
157             reply.writeInt(result);
158             return reply;
159         }
160 
161         protected Message getReply(MapMessage request) throws JMSException {
162             String operator = request.getString("operator");
163             int operand1 = request.getInt("operand1");
164             int operand2 = request.getInt("operand2");
165             int result = ("add".equals(operator) ? operand1 + operand2 : -1);
166             MapMessage reply = session.createMapMessage();
167             reply.setInt("result", result);
168             return reply;
169         }
170 
171         protected Message getReply(TextMessage request) throws Exception {
172             logger.debug("text request body={}", request.getText());
173             Properties props = new Properties();
174             props.load(new ByteArrayInputStream(request.getText().getBytes()));
175             String operator = props.getProperty("operator");
176             int operand1 = Integer.parseInt(props.getProperty("operand1"));
177             int operand2 = Integer.parseInt(props.getProperty("operand2"));
178             int result = ("add".equals(operator) ? operand1 + operand2 : -1);
179             TextMessage reply = session.createTextMessage();
180             reply.setText(new Integer(result).toString());
181             return reply;
182         }
183         
184         protected Message getReply(ObjectMessage request) throws Exception {
185             logger.debug("object request body={}", request.getObject());
186             @SuppressWarnings("unchecked")
187 			Map<String, Object> body = (Map<String, Object>)request.getObject();
188             String operator =  (String)body.get("operator");
189             int operand1 = ((MyInteger)body.get("operand1")).getValue();
190             int operand2 = ((MyInteger)body.get("operand2")).getValue();
191             int result = ("add".equals(operator) ? operand1 + operand2 : -1);
192             ObjectMessage reply = session.createObjectMessage();
193             reply.setObject(new MyInteger(result));
194             return reply;
195         }
196 
197         protected Message getReply(BytesMessage request) throws JMSException {
198             logger.debug("body={} bytes", request.getBodyLength());
199             byte buffer[] = new byte[10];
200             request.readBytes(buffer, 3);
201             String operator = new String(buffer);
202             int operand1 = request.readByte();
203             int operand2 = request.readByte();
204             int result = (operator.startsWith("add") ? operand1 + operand2 : -1);
205             BytesMessage reply = session.createBytesMessage();
206             reply.writeInt(result);
207             return reply;
208         }
209         
210         protected Message getReply(Message request) throws JMSException {
211             String operator = request.getStringProperty("operator");
212             int operand1 = request.getIntProperty("operand1");
213             int operand2 = request.getIntProperty("operand2");
214             int result = ("add".equals(operator) ? operand1 + operand2 : -1);
215             Message reply = session.createMessage();
216             reply.setIntProperty("result", result);
217             return reply;
218         }
219     }
220     
221     @Test
222     public void testStreamMessage() throws Exception {
223         logger.info("*** testStreamMessage ***");
224         
225         StreamMessage request = session.createStreamMessage();        
226         request.writeString("add");
227         request.writeInt(2);
228         request.writeInt(3);
229 
230         request.setJMSReplyTo(replyDestination);
231         producer.send(request);
232 
233         StreamMessage response = (StreamMessage)replyConsumer.receive();
234         int result = response.readInt();
235         assertEquals("wrong answer:" + result, 5, result);
236     }
237     
238     @Test
239     public void testMapMessage() throws Exception {
240         logger.info("*** testMapMessage ***");
241         
242         MapMessage request = session.createMapMessage();        
243         request.setString("operator", "add");
244         request.setInt("operand1", 2);
245         request.setInt("operand2", 3);
246 
247         request.setJMSReplyTo(replyDestination);
248         producer.send(request);
249 
250         MapMessage response = (MapMessage)replyConsumer.receive();
251         int result = response.getInt("result");
252         assertEquals("wrong answer:" + result, 5, result);
253     }
254 
255     @Test
256     public void testTextMessage() throws Exception {
257         logger.info("*** testTextMessage ***");
258         
259         TextMessage request = session.createTextMessage();
260         Properties props = new Properties();
261         props.put("operator", "add");
262         props.put("operand1", new Integer(2).toString());
263         props.put("operand2", new Integer(3).toString());
264         StringWriter bodyText = new StringWriter();        
265         props.list(new PrintWriter(bodyText));
266         request.setText(bodyText.toString());
267 
268         request.setJMSReplyTo(replyDestination);
269         producer.send(request);
270 
271         TextMessage response = (TextMessage)replyConsumer.receive();
272         String resultStr = response.getText();
273         int result = Integer.parseInt(resultStr);
274         assertEquals("wrong answer:" + result, 5, result);
275     }
276 
277     @Test
278     public void testObjectMessage() throws Exception {
279         logger.info("*** testObjectMessage ***");
280         
281         ObjectMessage request = session.createObjectMessage();
282         Map<String, Serializable> body = new HashMap<String, Serializable>();
283         body.put("operator", "add");
284         body.put("operand1", new MyInteger(2));  //use a custom class as an
285         body.put("operand2", new MyInteger(3));  //example of serializable
286         request.setObject((Serializable)body);
287 
288         request.setJMSReplyTo(replyDestination);
289         producer.send(request);
290 
291         ObjectMessage response = (ObjectMessage)replyConsumer.receive();
292         int result = ((MyInteger)response.getObject()).getValue();
293         assertEquals("wrong answer:" + result, 5, result);
294     }
295 
296     @Test
297     public void testBytesMessage() throws Exception {
298         logger.info("*** testBytesMessage ***");
299        
300         ByteArrayOutputStream bos = new ByteArrayOutputStream();
301         bos.write("add".getBytes());
302         bos.write(2);
303         bos.write(3);
304         
305         BytesMessage request = session.createBytesMessage();
306         request.writeBytes(bos.toByteArray());
307 
308         request.setJMSReplyTo(replyDestination);
309         producer.send(request);
310 
311         BytesMessage response = (BytesMessage)replyConsumer.receive();
312         int result = response.readInt();
313         assertEquals("wrong answer:" + result, 5, result);
314     }
315 
316     @Test
317     public void testMessage() throws Exception {
318         logger.info("*** testMessage ***");
319 
320         Message request = session.createMessage();        
321         request.setStringProperty("operator", "add");
322         request.setIntProperty("operand1", 2);
323         request.setIntProperty("operand2", 3);
324 
325         request.setJMSReplyTo(replyDestination);
326         producer.send(request);
327 
328         Message response = replyConsumer.receive();
329         int result = response.getIntProperty("result");
330         assertEquals("wrong answer:" + result, 5, result);       
331     }
332 
333 }