View Javadoc
1   package ejava.examples.jms10.jmsmechanics;
2   
3   import static org.junit.Assert.*;
4   
5   import java.util.ArrayList;
6   import java.util.LinkedList;
7   import java.util.List;
8   
9   import javax.jms.Destination;
10  import javax.jms.JMSException;
11  import javax.jms.Message;
12  import javax.jms.MessageConsumer;
13  import javax.jms.MessageListener;
14  import javax.jms.MessageProducer;
15  import javax.jms.Session;
16  import javax.jms.Topic;
17  
18  import org.slf4j.Logger;
19  import org.slf4j.LoggerFactory;
20  import org.junit.Before;
21  import org.junit.Test;
22  
23  /**
24   * This test case performs a demonstration of using a message selector with
25   * a MessageConsumer and a Topic. In the specific case tested, the number
26   * of messages received will be 2x the number sent for 'warn' and 'fatal' plus
27   * 1x the number sent for 'info' plus zero for number sent for 'debug'.
28   */
29  public class MessageSelectorTopicTest extends JMSTestBase {
30      static final Logger logger = LoggerFactory.getLogger(MessageSelectorTopicTest.class);
31      protected Destination destination;        
32  
33      @Before
34      public void setUp() throws Exception {
35          destination = (Topic) lookup(topicJNDI);
36          assertNotNull("null destination:" + topicJNDI, destination);
37      }
38      
39      private interface MyClient {
40          int getCount();
41          Message getMessage() throws Exception;
42      }
43      private class AsyncClient implements MessageListener, MyClient {
44          private int count=0;
45          private LinkedList<Message> messages = new LinkedList<Message>();
46          public void onMessage(Message message) {
47              try {
48                  logger.debug("onMessage received ({}):{}, level={}", 
49                          ++count, message.getJMSMessageID(), message.getStringProperty("level"));
50                  message.acknowledge();
51                  synchronized(messages) {
52                      messages.add(message);
53                  }
54              } catch (JMSException ex) {
55                  logger.error("error handling message", ex);
56              }
57          }        
58          public int getCount() { return count; }
59          public Message getMessage() {
60              synchronized(messages) {
61                  return (messages.isEmpty() ? null : messages.remove());
62              }
63          }
64      }
65      
66      private class SyncClient implements MyClient {
67          private MessageConsumer consumer;
68          private int count=0;
69          public SyncClient(MessageConsumer consumer) {
70              this.consumer = consumer;
71          }
72          public int getCount() { return count; }
73          public Message getMessage() throws JMSException {
74              Message message=consumer.receiveNoWait();
75              if (message != null) {
76                  logger.debug("receive ({}):{}, level={}", 
77                  ++count, message.getJMSMessageID(), message.getStringProperty("level"));
78                  message.acknowledge();
79              }
80              return message;
81          }
82      }
83  
84      @Test
85      public void testMessageSelector() throws Exception {
86          logger.info("*** testMessageSelector ***");
87          Session session = null;
88          Session asyncSession = null;
89          MessageProducer producer = null;
90          MessageConsumer asyncConsumer = null;
91          MessageConsumer syncConsumer = null;
92          try {
93              connection.stop();
94              //need to use CLIENT_ACK to avoid race condition within this app
95              session = connection.createSession(
96                      false, Session.CLIENT_ACKNOWLEDGE);
97              asyncSession = connection.createSession(
98                      false, Session.CLIENT_ACKNOWLEDGE);
99              List<MyClient> clients = new ArrayList<MyClient>();
100 
101             //create a client to asynchronous receive messages through 
102             //onMessage() callbacks
103             String selector1 = "level in ('warn', 'fatal')";
104             asyncConsumer = asyncSession.createConsumer(destination, selector1);
105             AsyncClient asyncClient = new AsyncClient();
106             asyncConsumer.setMessageListener(asyncClient);
107             clients.add(asyncClient);
108 
109             //create a client to synchronously poll for messages with 
110             //receive calls
111             String selector2 = "level in ('debug', 'info','warn', 'fatal')";
112             syncConsumer = session.createConsumer(destination, selector2);
113             SyncClient syncClient = new SyncClient(syncConsumer);
114             clients.add(syncClient);
115             
116             String levels[] = {"info", "warn", "fatal"}; //no "debug"
117             producer = session.createProducer(destination);
118             Message message = session.createMessage();
119             for (String level : levels) {
120                 message.setStringProperty("level", level);
121                 producer.send(message);
122                 logger.info("sent msgId={}, level={}", 
123                 message.getJMSMessageID(), message.getStringProperty("level"));
124             }
125             
126             connection.start();
127             int receivedCount=0;
128             for(int i=0; i<10; i++) {
129                 for(MyClient client: clients) {
130                     Message m = client.getMessage();
131                     receivedCount += (m != null ? 1 : 0);
132                 }
133                 if (receivedCount == 5) { break; }
134                 logger.debug("waiting for messages...");
135                 Thread.sleep(1000);
136             }
137             assertEquals(2, asyncClient.getCount());
138             assertEquals(3, syncClient.getCount());
139         }
140         finally {
141             if (connection != null) { connection.stop(); }
142             if (asyncConsumer != null) { asyncConsumer.close(); }
143             if (syncConsumer != null) { syncConsumer.close(); }
144             if (producer != null) { producer.close(); }
145             if (session != null)  { session.close(); }
146             if (asyncSession != null)  { asyncSession.close(); }
147         }
148     }
149     
150     @Test
151     public void testMessageSelectorMulti() throws Exception {
152         logger.info("*** testMessageSelectorMulti ***");
153         Session session = null;
154         Session asyncSession = null;
155         MessageProducer producer = null;
156         MessageConsumer asyncConsumer = null;
157         MessageConsumer syncConsumer = null;
158         try {
159             connection.stop();
160             //need to use CLIENT_ACK to avoid race condition within this app
161             session = connection.createSession(
162                     false, Session.CLIENT_ACKNOWLEDGE);
163             asyncSession = connection.createSession(
164                     false, Session.CLIENT_ACKNOWLEDGE);
165             List<MyClient> clients = new ArrayList<MyClient>();
166 
167             //create a client to asynchronous receive messages through 
168             //onMessage() callbacks
169             String selector1 = "level in ('warn', 'fatal')";
170             asyncConsumer = asyncSession.createConsumer(destination, selector1);
171             AsyncClient asyncClient = new AsyncClient();
172             asyncConsumer.setMessageListener(asyncClient);
173             clients.add(asyncClient);
174 
175             //create a client to synchronously poll for messages with 
176             //receive calls
177             String selector2 = "level in ('debug', 'info','warn', 'fatal')";
178             syncConsumer = session.createConsumer(destination, selector2);
179             SyncClient syncClient = new SyncClient(syncConsumer);
180             clients.add(syncClient);
181             
182             String levels[] = {"info", "warn", "fatal"}; // no "debug",
183             producer = session.createProducer(destination);
184             Message message = session.createMessage();
185             for (int i=0; i<msgCount; i++) {
186                 for (String level : levels) {
187                     message.setStringProperty("level", level);
188                     producer.send(message);
189                     logger.info("sent msgId={}, level={}",
190                             message.getJMSMessageID(), message.getStringProperty("level"));
191                 }
192             }
193             
194             connection.start();
195             int receivedCount=0;
196             for(int i=0; i<10 || i<msgCount; i++) {
197                 for(MyClient client: clients) {
198                     Message m=null;
199                     do {
200                        m = client.getMessage();
201                        receivedCount += (m != null ? 1 : 0);
202                     } while (m != null);
203                 }
204                 if (receivedCount == (3*msgCount + 2*msgCount)) { break; }
205                 logger.debug("waiting for messages...");
206                 Thread.sleep(10);
207             }
208             assertEquals(msgCount*2, asyncClient.getCount());
209             assertEquals(msgCount*3, syncClient.getCount());
210         }
211         finally {
212             if (connection != null) { connection.stop(); }
213             if (asyncConsumer != null) { asyncConsumer.close(); }
214             if (syncConsumer != null) { syncConsumer.close(); }
215             if (producer != null) { producer.close(); }
216             if (session != null)  { session.close(); }
217             if (asyncSession != null)  { asyncSession.close(); }
218         }
219     }        
220 }