1 package ejava.examples.jms10.jmsmechanics;
2
3 import static org.junit.Assert.*;
4
5 import java.util.ArrayList;
6 import java.util.LinkedList;
7 import java.util.List;
8
9 import javax.jms.Destination;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageConsumer;
13 import javax.jms.MessageListener;
14 import javax.jms.MessageProducer;
15 import javax.jms.Session;
16 import javax.jms.Topic;
17
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 import org.junit.Before;
21 import org.junit.Test;
22
23
24
25
26
27
28
29 public class MessageSelectorTopicTest extends JMSTestBase {
30 static final Logger logger = LoggerFactory.getLogger(MessageSelectorTopicTest.class);
31 protected Destination destination;
32
33 @Before
34 public void setUp() throws Exception {
35 destination = (Topic) lookup(topicJNDI);
36 assertNotNull("null destination:" + topicJNDI, destination);
37 }
38
39 private interface MyClient {
40 int getCount();
41 Message getMessage() throws Exception;
42 }
43 private class AsyncClient implements MessageListener, MyClient {
44 private int count=0;
45 private LinkedList<Message> messages = new LinkedList<Message>();
46 public void onMessage(Message message) {
47 try {
48 logger.debug("onMessage received ({}):{}, level={}",
49 ++count, message.getJMSMessageID(), message.getStringProperty("level"));
50 message.acknowledge();
51 synchronized(messages) {
52 messages.add(message);
53 }
54 } catch (JMSException ex) {
55 logger.error("error handling message", ex);
56 }
57 }
58 public int getCount() { return count; }
59 public Message getMessage() {
60 synchronized(messages) {
61 return (messages.isEmpty() ? null : messages.remove());
62 }
63 }
64 }
65
66 private class SyncClient implements MyClient {
67 private MessageConsumer consumer;
68 private int count=0;
69 public SyncClient(MessageConsumer consumer) {
70 this.consumer = consumer;
71 }
72 public int getCount() { return count; }
73 public Message getMessage() throws JMSException {
74 Message message=consumer.receiveNoWait();
75 if (message != null) {
76 logger.debug("receive ({}):{}, level={}",
77 ++count, message.getJMSMessageID(), message.getStringProperty("level"));
78 message.acknowledge();
79 }
80 return message;
81 }
82 }
83
84 @Test
85 public void testMessageSelector() throws Exception {
86 logger.info("*** testMessageSelector ***");
87 Session session = null;
88 Session asyncSession = null;
89 MessageProducer producer = null;
90 MessageConsumer asyncConsumer = null;
91 MessageConsumer syncConsumer = null;
92 try {
93 connection.stop();
94
95 session = connection.createSession(
96 false, Session.CLIENT_ACKNOWLEDGE);
97 asyncSession = connection.createSession(
98 false, Session.CLIENT_ACKNOWLEDGE);
99 List<MyClient> clients = new ArrayList<MyClient>();
100
101
102
103 String selector1 = "level in ('warn', 'fatal')";
104 asyncConsumer = asyncSession.createConsumer(destination, selector1);
105 AsyncClient asyncClient = new AsyncClient();
106 asyncConsumer.setMessageListener(asyncClient);
107 clients.add(asyncClient);
108
109
110
111 String selector2 = "level in ('debug', 'info','warn', 'fatal')";
112 syncConsumer = session.createConsumer(destination, selector2);
113 SyncClient syncClient = new SyncClient(syncConsumer);
114 clients.add(syncClient);
115
116 String levels[] = {"info", "warn", "fatal"};
117 producer = session.createProducer(destination);
118 Message message = session.createMessage();
119 for (String level : levels) {
120 message.setStringProperty("level", level);
121 producer.send(message);
122 logger.info("sent msgId={}, level={}",
123 message.getJMSMessageID(), message.getStringProperty("level"));
124 }
125
126 connection.start();
127 int receivedCount=0;
128 for(int i=0; i<10; i++) {
129 for(MyClient client: clients) {
130 Message m = client.getMessage();
131 receivedCount += (m != null ? 1 : 0);
132 }
133 if (receivedCount == 5) { break; }
134 logger.debug("waiting for messages...");
135 Thread.sleep(1000);
136 }
137 assertEquals(2, asyncClient.getCount());
138 assertEquals(3, syncClient.getCount());
139 }
140 finally {
141 if (connection != null) { connection.stop(); }
142 if (asyncConsumer != null) { asyncConsumer.close(); }
143 if (syncConsumer != null) { syncConsumer.close(); }
144 if (producer != null) { producer.close(); }
145 if (session != null) { session.close(); }
146 if (asyncSession != null) { asyncSession.close(); }
147 }
148 }
149
150 @Test
151 public void testMessageSelectorMulti() throws Exception {
152 logger.info("*** testMessageSelectorMulti ***");
153 Session session = null;
154 Session asyncSession = null;
155 MessageProducer producer = null;
156 MessageConsumer asyncConsumer = null;
157 MessageConsumer syncConsumer = null;
158 try {
159 connection.stop();
160
161 session = connection.createSession(
162 false, Session.CLIENT_ACKNOWLEDGE);
163 asyncSession = connection.createSession(
164 false, Session.CLIENT_ACKNOWLEDGE);
165 List<MyClient> clients = new ArrayList<MyClient>();
166
167
168
169 String selector1 = "level in ('warn', 'fatal')";
170 asyncConsumer = asyncSession.createConsumer(destination, selector1);
171 AsyncClient asyncClient = new AsyncClient();
172 asyncConsumer.setMessageListener(asyncClient);
173 clients.add(asyncClient);
174
175
176
177 String selector2 = "level in ('debug', 'info','warn', 'fatal')";
178 syncConsumer = session.createConsumer(destination, selector2);
179 SyncClient syncClient = new SyncClient(syncConsumer);
180 clients.add(syncClient);
181
182 String levels[] = {"info", "warn", "fatal"};
183 producer = session.createProducer(destination);
184 Message message = session.createMessage();
185 for (int i=0; i<msgCount; i++) {
186 for (String level : levels) {
187 message.setStringProperty("level", level);
188 producer.send(message);
189 logger.info("sent msgId={}, level={}",
190 message.getJMSMessageID(), message.getStringProperty("level"));
191 }
192 }
193
194 connection.start();
195 int receivedCount=0;
196 for(int i=0; i<10 || i<msgCount; i++) {
197 for(MyClient client: clients) {
198 Message m=null;
199 do {
200 m = client.getMessage();
201 receivedCount += (m != null ? 1 : 0);
202 } while (m != null);
203 }
204 if (receivedCount == (3*msgCount + 2*msgCount)) { break; }
205 logger.debug("waiting for messages...");
206 Thread.sleep(10);
207 }
208 assertEquals(msgCount*2, asyncClient.getCount());
209 assertEquals(msgCount*3, syncClient.getCount());
210 }
211 finally {
212 if (connection != null) { connection.stop(); }
213 if (asyncConsumer != null) { asyncConsumer.close(); }
214 if (syncConsumer != null) { syncConsumer.close(); }
215 if (producer != null) { producer.close(); }
216 if (session != null) { session.close(); }
217 if (asyncSession != null) { asyncSession.close(); }
218 }
219 }
220 }