1 package ejava.examples.jms10.jmsmechanics;
2
3 import static org.junit.Assert.*;
4
5
6 import java.util.ArrayList;
7 import java.util.LinkedList;
8 import java.util.List;
9
10 import javax.jms.Destination;
11 import javax.jms.JMSException;
12 import javax.jms.Message;
13 import javax.jms.MessageConsumer;
14 import javax.jms.MessageListener;
15 import javax.jms.MessageProducer;
16 import javax.jms.Session;
17 import javax.jms.Queue;
18
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import org.junit.Before;
22 import org.junit.Test;
23
24
25
26
27
28
29
30
31 public class MessageSelectorQueueTest extends JMSTestBase {
32 static final Logger logger = LoggerFactory.getLogger(MessageSelectorQueueTest.class);
33 protected Destination destination;
34
35 @Before
36 public void setUp() throws Exception {
37 destination = (Queue) lookup(queueJNDI);
38 assertNotNull("null destination:" + queueJNDI, destination);
39 }
40
41 private interface MyClient {
42 int getCount();
43 Message getMessage() throws Exception;
44 }
45 private class AsyncClient implements MessageListener, MyClient {
46 private int count=0;
47 private LinkedList<Message> messages = new LinkedList<Message>();
48 public void onMessage(Message message) {
49 try {
50 logger.debug("onMessage received ({}):{}, level={}",
51 ++count , message.getJMSMessageID(), message.getStringProperty("level"));
52 message.acknowledge();
53 synchronized(messages) {
54 messages.add(message);
55 }
56 } catch (JMSException ex) {
57 logger.error("error handling message", ex);
58 }
59 }
60 public int getCount() { return count; }
61 public Message getMessage() {
62 synchronized(messages) {
63 return (messages.isEmpty() ? null : messages.remove());
64 }
65 }
66 }
67
68 private class SyncClient implements MyClient {
69 private MessageConsumer consumer;
70 private int count=0;
71 public SyncClient(MessageConsumer consumer) {
72 this.consumer = consumer;
73 }
74 public int getCount() { return count; }
75 public Message getMessage() throws JMSException {
76 Message message=consumer.receiveNoWait();
77 if (message != null) {
78 logger.debug("receive ({}):{}, level={}",
79 ++count , message.getJMSMessageID(), message.getStringProperty("level"));
80 message.acknowledge();
81 }
82 return message;
83 }
84 }
85
86 @Test
87 public void testMessageSelector() throws Exception {
88 logger.info("*** testMessageSelector ***");
89 Session session = null;
90 Session asyncSession = null;
91 MessageProducer producer = null;
92 MessageConsumer asyncConsumer = null;
93 MessageConsumer syncConsumer = null;
94 try {
95 connection.stop();
96
97 session = connection.createSession(
98 false, Session.CLIENT_ACKNOWLEDGE);
99
100 asyncSession = connection.createSession(
101 false, Session.CLIENT_ACKNOWLEDGE);
102 List<MyClient> clients = new ArrayList<MyClient>();
103
104
105
106 String selector1 = "level in ('warn', 'fatal')";
107 asyncConsumer = asyncSession.createConsumer(destination, selector1);
108 AsyncClient asyncClient = new AsyncClient();
109 asyncConsumer.setMessageListener(asyncClient);
110 clients.add(asyncClient);
111
112
113
114 String selector2 = "level in ('debug', 'info','warn', 'fatal')";
115 syncConsumer = session.createConsumer(destination, selector2);
116 SyncClient syncClient = new SyncClient(syncConsumer);
117 clients.add(syncClient);
118
119 String levels[] = {"info", "warn", "fatal"};
120 producer = session.createProducer(destination);
121 Message message = session.createMessage();
122 for (String level : levels) {
123 message.setStringProperty("level", level);
124 producer.send(message);
125 logger.info("sent msgId={}, level={}",
126 message.getJMSMessageID(), message.getStringProperty("level"));
127 }
128
129 connection.start();
130 int receivedCount=0;
131 for(int i=0; i<10; i++) {
132 for(MyClient client: clients) {
133 Message m = client.getMessage();
134 receivedCount += (m != null ? 1 : 0);
135 }
136 if (receivedCount == 3) { break; }
137 logger.debug("waiting for messages...");
138 Thread.sleep(1000);
139 }
140 logger.info("asyncClient received {} msgs", asyncClient.getCount());
141 logger.info("syncClient received {} msgs", syncClient.getCount());
142 assertEquals(3, asyncClient.getCount()+ syncClient.getCount());
143 }
144 finally {
145 if (connection != null) { connection.stop(); }
146 if (asyncConsumer != null) { asyncConsumer.close(); }
147 if (syncConsumer != null) { syncConsumer.close(); }
148 if (producer != null) { producer.close(); }
149 if (session != null) { session.close(); }
150 if (asyncSession != null) { asyncSession.close(); }
151 }
152 }
153
154 @Test
155 public void testMessageSelectorMulti() throws Exception {
156 logger.info("*** testMessageSelectorMulti ***");
157 Session session = null;
158 Session asyncSession = null;
159 MessageProducer producer = null;
160 MessageConsumer asyncConsumer = null;
161 MessageConsumer syncConsumer = null;
162 try {
163 connection.stop();
164
165 session = connection.createSession(
166 false, Session.CLIENT_ACKNOWLEDGE);
167
168 asyncSession = connection.createSession(
169 false, Session.CLIENT_ACKNOWLEDGE);
170 List<MyClient> clients = new ArrayList<MyClient>();
171
172
173
174 String selector1 = "level in ('warn', 'fatal')";
175 asyncConsumer = asyncSession.createConsumer(destination, selector1);
176 AsyncClient asyncClient = new AsyncClient();
177 asyncConsumer.setMessageListener(asyncClient);
178 clients.add(asyncClient);
179
180
181
182 String selector2 = "level in ('debug', 'info','warn', 'fatal')";
183 syncConsumer = session.createConsumer(destination, selector2);
184 SyncClient syncClient = new SyncClient(syncConsumer);
185 clients.add(syncClient);
186
187 String levels[] = {"info", "warn", "fatal"};
188 producer = session.createProducer(destination);
189 Message message = session.createMessage();
190 for (int i=0; i<msgCount; i++) {
191 for (String level : levels) {
192 message.setStringProperty("level", level);
193 producer.send(message);
194 logger.info("sent msgId={}, level={}",
195 message.getJMSMessageID(), message.getStringProperty("level"));
196 }
197 }
198
199 connection.start();
200 int receivedCount=0;
201 for(int i=0; i<10 || i<msgCount; i++) {
202 for(MyClient client: clients) {
203 Message m=null;
204 do {
205 m = client.getMessage();
206 receivedCount += (m != null ? 1 : 0);
207 } while (m != null);
208 }
209 if (receivedCount == (3*msgCount)) { break; }
210 logger.debug("waiting for messages...");
211 Thread.sleep(10);
212 }
213 logger.info("asyncClient received {} msgs", asyncClient.getCount());
214 logger.info("syncClient received {} msgs", syncClient.getCount());
215 assertEquals(msgCount*3,
216 asyncClient.getCount()+ syncClient.getCount());
217 }
218 finally {
219 if (connection != null) { connection.stop(); }
220 if (asyncConsumer != null) { asyncConsumer.close(); }
221 if (syncConsumer != null) { syncConsumer.close(); }
222 if (producer != null) { producer.close(); }
223 if (session != null) { session.close(); }
224 if (asyncSession != null) { asyncSession.close(); }
225 }
226 }
227
228 }