View Javadoc
1   package ejava.examples.jms10.jmsmechanics;
2   
3   import static org.junit.Assert.*;
4   
5   
6   import java.util.ArrayList;
7   import java.util.LinkedList;
8   import java.util.List;
9   
10  import javax.jms.Destination;
11  import javax.jms.JMSException;
12  import javax.jms.Message;
13  import javax.jms.MessageConsumer;
14  import javax.jms.MessageListener;
15  import javax.jms.MessageProducer;
16  import javax.jms.Session;
17  import javax.jms.Queue;
18  
19  import org.slf4j.Logger;
20  import org.slf4j.LoggerFactory;
21  import org.junit.Before;
22  import org.junit.Test;
23  
24  /**
25   * This test case performs a demonstration of using a message selector with
26   * a MessageConsumer and a Queue. In the specific case tested, the same number
27   * of messages sent will be received. However, one of the clients will only
28   * receive 'warn' and 'fatal' messages and the other client will receive 
29   * 'info', 'warn', and 'fatal'. No one will receive 'debug'.
30   */
31  public class MessageSelectorQueueTest extends JMSTestBase {
32      static final Logger logger = LoggerFactory.getLogger(MessageSelectorQueueTest.class);
33      protected Destination destination;        
34  
35      @Before
36      public void setUp() throws Exception {
37          destination = (Queue) lookup(queueJNDI);
38          assertNotNull("null destination:" + queueJNDI, destination);
39      }
40      
41      private interface MyClient {
42          int getCount();
43          Message getMessage() throws Exception;
44      }
45      private class AsyncClient implements MessageListener, MyClient {
46          private int count=0;
47          private LinkedList<Message> messages = new LinkedList<Message>();
48          public void onMessage(Message message) {
49              try {
50                  logger.debug("onMessage received ({}):{}, level={}", 
51                          ++count , message.getJMSMessageID(),  message.getStringProperty("level"));
52                  message.acknowledge();
53                  synchronized(messages) {
54                      messages.add(message);
55                  }
56              } catch (JMSException ex) {
57                  logger.error("error handling message", ex);
58              }
59          }        
60          public int getCount() { return count; }
61          public Message getMessage() {
62              synchronized(messages) {
63                  return (messages.isEmpty() ? null : messages.remove());
64              }
65          }
66      }
67      
68      private class SyncClient implements MyClient {
69          private MessageConsumer consumer;
70          private int count=0;
71          public SyncClient(MessageConsumer consumer) {
72              this.consumer = consumer;
73          }
74          public int getCount() { return count; }
75          public Message getMessage() throws JMSException {
76              Message message=consumer.receiveNoWait();
77              if (message != null) {
78                  logger.debug("receive ({}):{}, level={}", 
79                          ++count , message.getJMSMessageID(),  message.getStringProperty("level"));
80                  message.acknowledge();
81              }
82              return message;
83          }
84      }
85  
86      @Test
87      public void testMessageSelector() throws Exception {
88          logger.info("*** testMessageSelector ***");
89          Session session = null;
90          Session asyncSession = null;
91          MessageProducer producer = null;
92          MessageConsumer asyncConsumer = null;
93          MessageConsumer syncConsumer = null;
94          try {
95              connection.stop();
96              //need to use CLIENT_ACK to avoid race condition within this app
97              session = connection.createSession(
98                      false, Session.CLIENT_ACKNOWLEDGE);
99              //each session must operate in one thread only
100             asyncSession = connection.createSession(
101                     false, Session.CLIENT_ACKNOWLEDGE);
102             List<MyClient> clients = new ArrayList<MyClient>();
103 
104             //create a client to asynchronous receive messages through 
105             //onMessage() callbacks
106             String selector1 = "level in ('warn', 'fatal')";
107             asyncConsumer = asyncSession.createConsumer(destination, selector1);
108             AsyncClient asyncClient = new AsyncClient();
109             asyncConsumer.setMessageListener(asyncClient);
110             clients.add(asyncClient);
111 
112             //create a client to synchronously poll for messages with 
113             //receive calls
114             String selector2 = "level in ('debug', 'info','warn', 'fatal')";
115             syncConsumer = session.createConsumer(destination, selector2);
116             SyncClient syncClient = new SyncClient(syncConsumer);
117             clients.add(syncClient);
118             
119             String levels[] = {"info", "warn", "fatal"}; //no "debug", 
120             producer = session.createProducer(destination);
121             Message message = session.createMessage();
122             for (String level : levels) {
123                 message.setStringProperty("level", level);
124                 producer.send(message);
125                 logger.info("sent msgId={}, level={}", 
126                         message.getJMSMessageID(), message.getStringProperty("level"));
127             }
128             
129             connection.start();
130             int receivedCount=0;
131             for(int i=0; i<10; i++) {
132                 for(MyClient client: clients) {
133                     Message m = client.getMessage();
134                     receivedCount += (m != null ? 1 : 0);
135                 }
136                 if (receivedCount == 3) { break; }
137                 logger.debug("waiting for messages...");
138                 Thread.sleep(1000);
139             }
140             logger.info("asyncClient received {} msgs", asyncClient.getCount());
141             logger.info("syncClient received {} msgs", syncClient.getCount());
142             assertEquals(3, asyncClient.getCount()+ syncClient.getCount());
143         }
144         finally {
145             if (connection != null) { connection.stop(); }
146             if (asyncConsumer != null) { asyncConsumer.close(); }
147             if (syncConsumer != null) { syncConsumer.close(); }
148             if (producer != null) { producer.close(); }
149             if (session != null)  { session.close(); }
150             if (asyncSession != null)  { asyncSession.close(); }
151         }
152     }
153     
154     @Test
155     public void testMessageSelectorMulti() throws Exception {
156         logger.info("*** testMessageSelectorMulti ***");
157         Session session = null;
158         Session asyncSession = null;
159         MessageProducer producer = null;
160         MessageConsumer asyncConsumer = null;
161         MessageConsumer syncConsumer = null;
162         try {
163             connection.stop();
164             //need to use CLIENT_ACK to avoid race condition within this app
165             session = connection.createSession(
166                     false, Session.CLIENT_ACKNOWLEDGE);
167             //each session must operate in one thread only
168             asyncSession = connection.createSession(
169                     false, Session.CLIENT_ACKNOWLEDGE);
170             List<MyClient> clients = new ArrayList<MyClient>();
171 
172             //create a client to asynchronous receive messages through 
173             //onMessage() callbacks
174             String selector1 = "level in ('warn', 'fatal')";
175             asyncConsumer = asyncSession.createConsumer(destination, selector1);
176             AsyncClient asyncClient = new AsyncClient();
177             asyncConsumer.setMessageListener(asyncClient);
178             clients.add(asyncClient);
179 
180             //create a client to synchronously poll for messages with 
181             //receive calls
182             String selector2 = "level in ('debug', 'info','warn', 'fatal')";
183             syncConsumer = session.createConsumer(destination, selector2);
184             SyncClient syncClient = new SyncClient(syncConsumer);
185             clients.add(syncClient);
186             
187             String levels[] = {"info", "warn", "fatal"}; //no "debug",             
188             producer = session.createProducer(destination);
189             Message message = session.createMessage();
190             for (int i=0; i<msgCount; i++) {
191                 for (String level : levels) {
192                     message.setStringProperty("level", level);
193                     producer.send(message);
194                     logger.info("sent msgId={}, level={}", 
195                             message.getJMSMessageID(), message.getStringProperty("level"));
196                 }
197             }
198             
199             connection.start();
200             int receivedCount=0;
201             for(int i=0; i<10 || i<msgCount; i++) {
202                 for(MyClient client: clients) {
203                     Message m=null;
204                     do {
205                        m = client.getMessage();
206                        receivedCount += (m != null ? 1 : 0);
207                     } while (m != null);
208                 }
209                 if (receivedCount == (3*msgCount)) { break; }
210                 logger.debug("waiting for messages...");
211                 Thread.sleep(10);
212             }
213             logger.info("asyncClient received {} msgs", asyncClient.getCount());
214             logger.info("syncClient received {} msgs", syncClient.getCount());
215             assertEquals(msgCount*3, 
216                     asyncClient.getCount()+ syncClient.getCount());
217         }
218         finally {
219             if (connection != null) { connection.stop(); }
220             if (asyncConsumer != null) { asyncConsumer.close(); }
221             if (syncConsumer != null) { syncConsumer.close(); }
222             if (producer != null) { producer.close(); }
223             if (session != null)  { session.close(); }
224             if (asyncSession != null)  { asyncSession.close(); }
225         }
226     }    
227     
228 }