1 package ejava.examples.jms10.jmsmechanics;
2
3 import static org.junit.Assert.*;
4
5
6 import java.io.ByteArrayInputStream;
7 import java.io.ByteArrayOutputStream;
8 import java.io.PrintWriter;
9 import java.io.Serializable;
10 import java.io.StringWriter;
11 import java.util.HashMap;
12 import java.util.Map;
13 import java.util.Properties;
14
15 import javax.jms.BytesMessage;
16 import javax.jms.Destination;
17 import javax.jms.JMSException;
18 import javax.jms.MapMessage;
19 import javax.jms.Message;
20 import javax.jms.MessageConsumer;
21 import javax.jms.MessageListener;
22 import javax.jms.MessageProducer;
23 import javax.jms.ObjectMessage;
24 import javax.jms.Session;
25 import javax.jms.Queue;
26 import javax.jms.StreamMessage;
27 import javax.jms.TextMessage;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34
35
36
37
38 public class MessageTest extends JMSTestBase {
39 static final Logger logger = LoggerFactory.getLogger(MessageTest.class);
40 protected Destination destination;
41
42 protected Session session = null;
43 protected MessageProducer producer = null;
44 protected MessageConsumer consumer = null;
45 protected MessageConsumer replyConsumer = null;
46 protected Destination replyDestination = null;
47 protected Replier client;
48
49 @Before
50 public void setUp() throws Exception {
51 destination = (Queue) lookup(queueJNDI);
52 assertNotNull("null destination:" + queueJNDI, destination);
53
54 emptyQueue();
55
56
57 session = connection.createSession(
58 false, Session.AUTO_ACKNOWLEDGE);
59 consumer = session.createConsumer(destination);
60 client = new Replier();
61 client.setSession(session);
62 consumer.setMessageListener(client);
63 producer = session.createProducer(destination);
64 replyDestination = session.createTemporaryQueue();
65 replyConsumer = session.createConsumer(replyDestination);
66 connection.start();
67 }
68
69 @After
70 public void tearDown() throws Exception {
71 if (client != null) { client.close(); }
72 if (connection != null) { connection.stop(); }
73 if (replyConsumer != null) { replyConsumer.close(); }
74 if (consumer != null) { consumer.close(); }
75 if (producer != null) { producer.close(); }
76 if (session != null) { session.close(); }
77 }
78
79 protected void emptyQueue() throws JMSException {
80 Session session = null;
81 MessageConsumer consumer = null;
82 try {
83 connection.stop();
84 session = connection.createSession(
85 false, Session.AUTO_ACKNOWLEDGE);
86 consumer = session.createConsumer(destination);
87 connection.start();
88 Message message = null;
89 do {
90 message = consumer.receiveNoWait();
91 logger.debug("clearing old message {}", message);
92 } while (message != null);
93 connection.stop();
94 }
95 finally {
96 if (consumer != null) { consumer.close(); }
97 if (session != null) { session.close(); }
98 }
99 }
100
101
102
103 private static class MyInteger implements Serializable {
104 private static final long serialVersionUID = 6914811570078480852L;
105 private int value;
106 public MyInteger(int value) { this.value = value; }
107 public int getValue() { return value; }
108 }
109
110 private class Replier implements MessageListener {
111 private MessageProducer producer;
112 public void setSession(Session session) throws JMSException {
113 producer = session.createProducer(null);
114 }
115 public void onMessage(Message request) {
116 try {
117 logger.debug("onMessage received:{}:{}",
118 request.getJMSMessageID(), request.getClass().getName());
119 Destination replyDestination = request.getJMSReplyTo();
120
121 Message reply = null;
122 if (request instanceof StreamMessage) {
123 reply = getReply((StreamMessage)request);
124 }
125 else if (request instanceof MapMessage) {
126 reply = getReply((MapMessage)request);
127 }
128 else if (request instanceof TextMessage) {
129 reply = getReply((TextMessage)request);
130 }
131 else if (request instanceof BytesMessage) {
132 reply = getReply((BytesMessage)request);
133 }
134 else if (request instanceof ObjectMessage) {
135 reply = getReply((ObjectMessage)request);
136 }
137 else {
138 reply = getReply(request);
139 }
140 reply.setJMSCorrelationID(request.getJMSMessageID());
141 producer.send(replyDestination, reply);
142
143 } catch (Exception ex) {
144 logger.error("error handling message", ex);
145 }
146 }
147 public void close() throws JMSException {
148 if (producer != null) { producer.close(); }
149 }
150
151 protected Message getReply(StreamMessage request) throws JMSException {
152 String operator = request.readString();
153 int operand1 = request.readInt();
154 int operand2 = request.readInt();
155 int result = ("add".equals(operator) ? operand1 + operand2 : -1);
156 StreamMessage reply = session.createStreamMessage();
157 reply.writeInt(result);
158 return reply;
159 }
160
161 protected Message getReply(MapMessage request) throws JMSException {
162 String operator = request.getString("operator");
163 int operand1 = request.getInt("operand1");
164 int operand2 = request.getInt("operand2");
165 int result = ("add".equals(operator) ? operand1 + operand2 : -1);
166 MapMessage reply = session.createMapMessage();
167 reply.setInt("result", result);
168 return reply;
169 }
170
171 protected Message getReply(TextMessage request) throws Exception {
172 logger.debug("text request body={}", request.getText());
173 Properties props = new Properties();
174 props.load(new ByteArrayInputStream(request.getText().getBytes()));
175 String operator = props.getProperty("operator");
176 int operand1 = Integer.parseInt(props.getProperty("operand1"));
177 int operand2 = Integer.parseInt(props.getProperty("operand2"));
178 int result = ("add".equals(operator) ? operand1 + operand2 : -1);
179 TextMessage reply = session.createTextMessage();
180 reply.setText(new Integer(result).toString());
181 return reply;
182 }
183
184 protected Message getReply(ObjectMessage request) throws Exception {
185 logger.debug("object request body={}", request.getObject());
186 @SuppressWarnings("unchecked")
187 Map<String, Object> body = (Map<String, Object>)request.getObject();
188 String operator = (String)body.get("operator");
189 int operand1 = ((MyInteger)body.get("operand1")).getValue();
190 int operand2 = ((MyInteger)body.get("operand2")).getValue();
191 int result = ("add".equals(operator) ? operand1 + operand2 : -1);
192 ObjectMessage reply = session.createObjectMessage();
193 reply.setObject(new MyInteger(result));
194 return reply;
195 }
196
197 protected Message getReply(BytesMessage request) throws JMSException {
198 logger.debug("body={} bytes", request.getBodyLength());
199 byte buffer[] = new byte[10];
200 request.readBytes(buffer, 3);
201 String operator = new String(buffer);
202 int operand1 = request.readByte();
203 int operand2 = request.readByte();
204 int result = (operator.startsWith("add") ? operand1 + operand2 : -1);
205 BytesMessage reply = session.createBytesMessage();
206 reply.writeInt(result);
207 return reply;
208 }
209
210 protected Message getReply(Message request) throws JMSException {
211 String operator = request.getStringProperty("operator");
212 int operand1 = request.getIntProperty("operand1");
213 int operand2 = request.getIntProperty("operand2");
214 int result = ("add".equals(operator) ? operand1 + operand2 : -1);
215 Message reply = session.createMessage();
216 reply.setIntProperty("result", result);
217 return reply;
218 }
219 }
220
221 @Test
222 public void testStreamMessage() throws Exception {
223 logger.info("*** testStreamMessage ***");
224
225 StreamMessage request = session.createStreamMessage();
226 request.writeString("add");
227 request.writeInt(2);
228 request.writeInt(3);
229
230 request.setJMSReplyTo(replyDestination);
231 producer.send(request);
232
233 StreamMessage response = (StreamMessage)replyConsumer.receive();
234 int result = response.readInt();
235 assertEquals("wrong answer:" + result, 5, result);
236 }
237
238 @Test
239 public void testMapMessage() throws Exception {
240 logger.info("*** testMapMessage ***");
241
242 MapMessage request = session.createMapMessage();
243 request.setString("operator", "add");
244 request.setInt("operand1", 2);
245 request.setInt("operand2", 3);
246
247 request.setJMSReplyTo(replyDestination);
248 producer.send(request);
249
250 MapMessage response = (MapMessage)replyConsumer.receive();
251 int result = response.getInt("result");
252 assertEquals("wrong answer:" + result, 5, result);
253 }
254
255 @Test
256 public void testTextMessage() throws Exception {
257 logger.info("*** testTextMessage ***");
258
259 TextMessage request = session.createTextMessage();
260 Properties props = new Properties();
261 props.put("operator", "add");
262 props.put("operand1", new Integer(2).toString());
263 props.put("operand2", new Integer(3).toString());
264 StringWriter bodyText = new StringWriter();
265 props.list(new PrintWriter(bodyText));
266 request.setText(bodyText.toString());
267
268 request.setJMSReplyTo(replyDestination);
269 producer.send(request);
270
271 TextMessage response = (TextMessage)replyConsumer.receive();
272 String resultStr = response.getText();
273 int result = Integer.parseInt(resultStr);
274 assertEquals("wrong answer:" + result, 5, result);
275 }
276
277 @Test
278 public void testObjectMessage() throws Exception {
279 logger.info("*** testObjectMessage ***");
280
281 ObjectMessage request = session.createObjectMessage();
282 Map<String, Serializable> body = new HashMap<String, Serializable>();
283 body.put("operator", "add");
284 body.put("operand1", new MyInteger(2));
285 body.put("operand2", new MyInteger(3));
286 request.setObject((Serializable)body);
287
288 request.setJMSReplyTo(replyDestination);
289 producer.send(request);
290
291 ObjectMessage response = (ObjectMessage)replyConsumer.receive();
292 int result = ((MyInteger)response.getObject()).getValue();
293 assertEquals("wrong answer:" + result, 5, result);
294 }
295
296 @Test
297 public void testBytesMessage() throws Exception {
298 logger.info("*** testBytesMessage ***");
299
300 ByteArrayOutputStream bos = new ByteArrayOutputStream();
301 bos.write("add".getBytes());
302 bos.write(2);
303 bos.write(3);
304
305 BytesMessage request = session.createBytesMessage();
306 request.writeBytes(bos.toByteArray());
307
308 request.setJMSReplyTo(replyDestination);
309 producer.send(request);
310
311 BytesMessage response = (BytesMessage)replyConsumer.receive();
312 int result = response.readInt();
313 assertEquals("wrong answer:" + result, 5, result);
314 }
315
316 @Test
317 public void testMessage() throws Exception {
318 logger.info("*** testMessage ***");
319
320 Message request = session.createMessage();
321 request.setStringProperty("operator", "add");
322 request.setIntProperty("operand1", 2);
323 request.setIntProperty("operand2", 3);
324
325 request.setJMSReplyTo(replyDestination);
326 producer.send(request);
327
328 Message response = replyConsumer.receive();
329 int result = response.getIntProperty("result");
330 assertEquals("wrong answer:" + result, 5, result);
331 }
332
333 }